You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2013/06/12 18:41:48 UTC
svn commit: r1492274 [2/3] - in /zookeeper/bookkeeper/trunk: ./
bookkeeper-server/src/main/java/org/apache/bookkeeper/client/
bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/
bookkeeper-server/src/main/java/org/apache/bookkeeper/net/ bookkee...
Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/NetworkTopology.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/NetworkTopology.java?rev=1492274&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/NetworkTopology.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/NetworkTopology.java Wed Jun 12 16:41:47 2013
@@ -0,0 +1,876 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.net;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The class represents a cluster of computer with a tree hierarchical
+ * network topology.
+ * For example, a cluster may be consists of many data centers filled
+ * with racks of computers.
+ * In a network topology, leaves represent data nodes (computers) and inner
+ * nodes represent switches/routers that manage traffic in/out of data centers
+ * or racks.
+ *
+ */
+public class NetworkTopology {
+
+ public final static String DEFAULT_RACK = "/default-rack";
+ public final static int DEFAULT_HOST_LEVEL = 2;
+ public static final Logger LOG = LoggerFactory.getLogger(NetworkTopology.class);
+
+ public static class InvalidTopologyException extends RuntimeException {
+ private static final long serialVersionUID = 1L;
+
+ public InvalidTopologyException(String msg) {
+ super(msg);
+ }
+ }
+
+ /** InnerNode represents a switch/router of a data center or rack.
+ * Different from a leaf node, it has non-null children.
+ */
+ static class InnerNode extends NodeBase {
+ protected List<Node> children = new ArrayList<Node>();
+ private int numOfLeaves;
+
+ /** Construct an InnerNode from a path-like string */
+ InnerNode(String path) {
+ super(path);
+ }
+
+ /** Construct an InnerNode from its name and its network location */
+ InnerNode(String name, String location) {
+ super(name, location);
+ }
+
+ /** Construct an InnerNode
+ * from its name, its network location, its parent, and its level */
+ InnerNode(String name, String location, InnerNode parent, int level) {
+ super(name, location, parent, level);
+ }
+
+ /** @return its children */
+ List<Node> getChildren() {
+ return children;
+ }
+
+ /** @return the number of children this node has */
+ int getNumOfChildren() {
+ return children.size();
+ }
+
+ /** Judge if this node represents a rack
+ * @return true if it has no child or its children are not InnerNodes
+ */
+ boolean isRack() {
+ if (children.isEmpty()) {
+ return true;
+ }
+
+ Node firstChild = children.get(0);
+ if (firstChild instanceof InnerNode) {
+ return false;
+ }
+
+ return true;
+ }
+
+ /** Judge if this node is an ancestor of node <i>n</i>
+ *
+ * @param n a node
+ * @return true if this node is an ancestor of <i>n</i>
+ */
+ boolean isAncestor(Node n) {
+ return getPath(this).equals(NodeBase.PATH_SEPARATOR_STR)
+ || (n.getNetworkLocation() + NodeBase.PATH_SEPARATOR_STR).startsWith(getPath(this)
+ + NodeBase.PATH_SEPARATOR_STR);
+ }
+
+ /** Judge if this node is the parent of node <i>n</i>
+ *
+ * @param n a node
+ * @return true if this node is the parent of <i>n</i>
+ */
+ boolean isParent(Node n) {
+ return n.getNetworkLocation().equals(getPath(this));
+ }
+
+ /* Return a child name of this node who is an ancestor of node <i>n</i> */
+ private String getNextAncestorName(Node n) {
+ if (!isAncestor(n)) {
+ throw new IllegalArgumentException(this + "is not an ancestor of " + n);
+ }
+ String name = n.getNetworkLocation().substring(getPath(this).length());
+ if (name.charAt(0) == PATH_SEPARATOR) {
+ name = name.substring(1);
+ }
+ int index = name.indexOf(PATH_SEPARATOR);
+ if (index != -1)
+ name = name.substring(0, index);
+ return name;
+ }
+
+ /** Add node <i>n</i> to the subtree of this node
+ * @param n node to be added
+ * @return true if the node is added; false otherwise
+ */
+ boolean add(Node n) {
+ if (!isAncestor(n))
+ throw new IllegalArgumentException(n.getName() + ", which is located at " + n.getNetworkLocation()
+ + ", is not a decendent of " + getPath(this));
+ if (isParent(n)) {
+ // this node is the parent of n; add n directly
+ n.setParent(this);
+ n.setLevel(this.level + 1);
+ for (int i = 0; i < children.size(); i++) {
+ if (children.get(i).getName().equals(n.getName())) {
+ children.set(i, n);
+ return false;
+ }
+ }
+ children.add(n);
+ numOfLeaves++;
+ return true;
+ } else {
+ // find the next ancestor node
+ String parentName = getNextAncestorName(n);
+ InnerNode parentNode = null;
+ for (int i = 0; i < children.size(); i++) {
+ if (children.get(i).getName().equals(parentName)) {
+ parentNode = (InnerNode) children.get(i);
+ break;
+ }
+ }
+ if (parentNode == null) {
+ // create a new InnerNode
+ parentNode = createParentNode(parentName);
+ children.add(parentNode);
+ }
+ // add n to the subtree of the next ancestor node
+ if (parentNode.add(n)) {
+ numOfLeaves++;
+ return true;
+ } else {
+ return false;
+ }
+ }
+ }
+
+ /**
+ * Creates a parent node to be added to the list of children.
+ * Creates a node using the InnerNode four argument constructor specifying
+ * the name, location, parent, and level of this node.
+ *
+ * <p>To be overridden in subclasses for specific InnerNode implementations,
+ * as alternative to overriding the full {@link #add(Node)} method.
+ *
+ * @param parentName The name of the parent node
+ * @return A new inner node
+ * @see InnerNode#InnerNode(String, String, InnerNode, int)
+ */
+ protected InnerNode createParentNode(String parentName) {
+ return new InnerNode(parentName, getPath(this), this, this.getLevel() + 1);
+ }
+
+ /** Remove node <i>n</i> from the subtree of this node
+ * @param n node to be deleted
+ * @return true if the node is deleted; false otherwise
+ */
+ boolean remove(Node n) {
+ String parent = n.getNetworkLocation();
+ String currentPath = getPath(this);
+ if (!isAncestor(n))
+ throw new IllegalArgumentException(n.getName() + ", which is located at " + parent
+ + ", is not a descendent of " + currentPath);
+ if (isParent(n)) {
+ // this node is the parent of n; remove n directly
+ for (int i = 0; i < children.size(); i++) {
+ if (children.get(i).getName().equals(n.getName())) {
+ children.remove(i);
+ numOfLeaves--;
+ n.setParent(null);
+ return true;
+ }
+ }
+ return false;
+ } else {
+ // find the next ancestor node: the parent node
+ String parentName = getNextAncestorName(n);
+ InnerNode parentNode = null;
+ int i;
+ for (i = 0; i < children.size(); i++) {
+ if (children.get(i).getName().equals(parentName)) {
+ parentNode = (InnerNode) children.get(i);
+ break;
+ }
+ }
+ if (parentNode == null) {
+ return false;
+ }
+ // remove n from the parent node
+ boolean isRemoved = parentNode.remove(n);
+ // if the parent node has no children, remove the parent node too
+ if (isRemoved) {
+ if (parentNode.getNumOfChildren() == 0) {
+ children.remove(i);
+ }
+ numOfLeaves--;
+ }
+ return isRemoved;
+ }
+ } // end of remove
+
+ /** Given a node's string representation, return a reference to the node
+ * @param loc string location of the form /rack/node
+ * @return null if the node is not found or the childnode is there but
+ * not an instance of {@link InnerNode}
+ */
+ private Node getLoc(String loc) {
+ if (loc == null || loc.length() == 0)
+ return this;
+
+ String[] path = loc.split(PATH_SEPARATOR_STR, 2);
+ Node childnode = null;
+ for (int i = 0; i < children.size(); i++) {
+ if (children.get(i).getName().equals(path[0])) {
+ childnode = children.get(i);
+ }
+ }
+ if (childnode == null)
+ return null; // non-existing node
+ if (path.length == 1)
+ return childnode;
+ if (childnode instanceof InnerNode) {
+ return ((InnerNode) childnode).getLoc(path[1]);
+ } else {
+ return null;
+ }
+ }
+
+ /** get <i>leafIndex</i> leaf of this subtree
+ * if it is not in the <i>excludedNode</i>
+ *
+ * @param leafIndex an indexed leaf of the node
+ * @param excludedNode an excluded node (can be null)
+ * @return
+ */
+ Node getLeaf(int leafIndex, Node excludedNode) {
+ int count = 0;
+ // check if the excluded node a leaf
+ boolean isLeaf = excludedNode == null || !(excludedNode instanceof InnerNode);
+ // calculate the total number of excluded leaf nodes
+ int numOfExcludedLeaves = isLeaf ? 1 : ((InnerNode) excludedNode).getNumOfLeaves();
+ if (isLeafParent()) { // children are leaves
+ if (isLeaf) { // excluded node is a leaf node
+ int excludedIndex = children.indexOf(excludedNode);
+ if (excludedIndex != -1 && leafIndex >= 0) {
+ // excluded node is one of the children so adjust the leaf index
+ leafIndex = leafIndex >= excludedIndex ? leafIndex + 1 : leafIndex;
+ }
+ }
+ // range check
+ if (leafIndex < 0 || leafIndex >= this.getNumOfChildren()) {
+ return null;
+ }
+ return children.get(leafIndex);
+ } else {
+ for (int i = 0; i < children.size(); i++) {
+ InnerNode child = (InnerNode) children.get(i);
+ if (excludedNode == null || excludedNode != child) {
+ // not the excludedNode
+ int numOfLeaves = child.getNumOfLeaves();
+ if (excludedNode != null && child.isAncestor(excludedNode)) {
+ numOfLeaves -= numOfExcludedLeaves;
+ }
+ if (count + numOfLeaves > leafIndex) {
+ // the leaf is in the child subtree
+ return child.getLeaf(leafIndex - count, excludedNode);
+ } else {
+ // go to the next child
+ count = count + numOfLeaves;
+ }
+ } else { // it is the excluededNode
+ // skip it and set the excludedNode to be null
+ excludedNode = null;
+ }
+ }
+ return null;
+ }
+ }
+
+ protected boolean isLeafParent() {
+ return isRack();
+ }
+
+ /**
+ * Determine if children a leaves, default implementation calls {@link #isRack()}
+ * <p>To be overridden in subclasses for specific InnerNode implementations,
+ * as alternative to overriding the full {@link #getLeaf(int, Node)} method.
+ *
+ * @return true if children are leaves, false otherwise
+ */
+ protected boolean areChildrenLeaves() {
+ return isRack();
+ }
+
+ /**
+ * Get number of leaves.
+ */
+ int getNumOfLeaves() {
+ return numOfLeaves;
+ }
+ } // end of InnerNode
+
+ /**
+ * the root cluster map
+ */
+ InnerNode clusterMap;
+ /** Depth of all leaf nodes */
+ private int depthOfAllLeaves = -1;
+ /** rack counter */
+ protected int numOfRacks = 0;
+ /** the lock used to manage access */
+ protected ReadWriteLock netlock = new ReentrantReadWriteLock();
+
+ public NetworkTopology() {
+ clusterMap = new InnerNode(InnerNode.ROOT);
+ }
+
+ /** Add a leaf node
+ * Update node counter & rack counter if necessary
+ * @param node node to be added; can be null
+ * @exception IllegalArgumentException if add a node to a leave
+ or node to be added is not a leaf
+ */
+ public void add(Node node) {
+ if (node == null)
+ return;
+ String oldTopoStr = this.toString();
+ if (node instanceof InnerNode) {
+ throw new IllegalArgumentException("Not allow to add an inner node: " + NodeBase.getPath(node));
+ }
+ int newDepth = NodeBase.locationToDepth(node.getNetworkLocation()) + 1;
+ netlock.writeLock().lock();
+ try {
+ if ((depthOfAllLeaves != -1) && (depthOfAllLeaves != newDepth)) {
+ LOG.error("Error: can't add leaf node at depth " + newDepth + " to topology:\n" + oldTopoStr);
+ throw new InvalidTopologyException("Invalid network topology. "
+ + "You cannot have a rack and a non-rack node at the same level of the network topology.");
+ }
+ Node rack = getNodeForNetworkLocation(node);
+ if (rack != null && !(rack instanceof InnerNode)) {
+ throw new IllegalArgumentException("Unexpected data node " + node.toString()
+ + " at an illegal network location");
+ }
+ if (clusterMap.add(node)) {
+ LOG.info("Adding a new node: " + NodeBase.getPath(node));
+ if (rack == null) {
+ numOfRacks++;
+ }
+ if (!(node instanceof InnerNode)) {
+ if (depthOfAllLeaves == -1) {
+ depthOfAllLeaves = node.getLevel();
+ }
+ }
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("NetworkTopology became:\n" + this.toString());
+ }
+ } finally {
+ netlock.writeLock().unlock();
+ }
+ }
+
+ /**
+ * Return a reference to the node given its string representation.
+ * Default implementation delegates to {@link #getNode(String)}.
+ *
+ * <p>To be overridden in subclasses for specific NetworkTopology
+ * implementations, as alternative to overriding the full {@link #add(Node)}
+ * method.
+ *
+ * @param node The string representation of this node's network location is
+ * used to retrieve a Node object.
+ * @return a reference to the node; null if the node is not in the tree
+ *
+ * @see #add(Node)
+ * @see #getNode(String)
+ */
+ protected Node getNodeForNetworkLocation(Node node) {
+ return getNode(node.getNetworkLocation());
+ }
+
+ /**
+ * Given a string representation of a rack, return its children
+ * @param loc a path-like string representation of a rack
+ * @return a newly allocated list with all the node's children
+ */
+ public List<Node> getDatanodesInRack(String loc) {
+ netlock.readLock().lock();
+ try {
+ loc = NodeBase.normalize(loc);
+ if (!NodeBase.ROOT.equals(loc)) {
+ loc = loc.substring(1);
+ }
+ InnerNode rack = (InnerNode) clusterMap.getLoc(loc);
+ if (rack == null) {
+ return null;
+ }
+ return new ArrayList<Node>(rack.getChildren());
+ } finally {
+ netlock.readLock().unlock();
+ }
+ }
+
+ /** Remove a node
+ * Update node counter and rack counter if necessary
+ * @param node node to be removed; can be null
+ */
+ public void remove(Node node) {
+ if (node == null)
+ return;
+ if (node instanceof InnerNode) {
+ throw new IllegalArgumentException("Not allow to remove an inner node: " + NodeBase.getPath(node));
+ }
+ LOG.info("Removing a node: " + NodeBase.getPath(node));
+ netlock.writeLock().lock();
+ try {
+ if (clusterMap.remove(node)) {
+ InnerNode rack = (InnerNode) getNode(node.getNetworkLocation());
+ if (rack == null) {
+ numOfRacks--;
+ }
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("NetworkTopology became:\n" + this.toString());
+ }
+ } finally {
+ netlock.writeLock().unlock();
+ }
+ }
+
+ /** Check if the tree contains node <i>node</i>
+ *
+ * @param node a node
+ * @return true if <i>node</i> is already in the tree; false otherwise
+ */
+ public boolean contains(Node node) {
+ if (node == null)
+ return false;
+ netlock.readLock().lock();
+ try {
+ Node parent = node.getParent();
+ for (int level = node.getLevel(); parent != null && level > 0; parent = parent.getParent(), level--) {
+ if (parent == clusterMap) {
+ return true;
+ }
+ }
+ } finally {
+ netlock.readLock().unlock();
+ }
+ return false;
+ }
+
+ /** Given a string representation of a node, return its reference
+ *
+ * @param loc
+ * a path-like string representation of a node
+ * @return a reference to the node; null if the node is not in the tree
+ */
+ public Node getNode(String loc) {
+ netlock.readLock().lock();
+ try {
+ loc = NodeBase.normalize(loc);
+ if (!NodeBase.ROOT.equals(loc))
+ loc = loc.substring(1);
+ return clusterMap.getLoc(loc);
+ } finally {
+ netlock.readLock().unlock();
+ }
+ }
+
+ /** Given a string representation of a rack for a specific network
+ * location
+ *
+ * To be overridden in subclasses for specific NetworkTopology
+ * implementations, as alternative to overriding the full
+ * {@link #getRack(String)} method.
+ * @param loc
+ * a path-like string representation of a network location
+ * @return a rack string
+ */
+ public String getRack(String loc) {
+ return loc;
+ }
+
+ /** @return the total number of racks */
+ public int getNumOfRacks() {
+ netlock.readLock().lock();
+ try {
+ return numOfRacks;
+ } finally {
+ netlock.readLock().unlock();
+ }
+ }
+
+ /** @return the total number of leaf nodes */
+ public int getNumOfLeaves() {
+ netlock.readLock().lock();
+ try {
+ return clusterMap.getNumOfLeaves();
+ } finally {
+ netlock.readLock().unlock();
+ }
+ }
+
+ /** Return the distance between two nodes
+ * It is assumed that the distance from one node to its parent is 1
+ * The distance between two nodes is calculated by summing up their distances
+ * to their closest common ancestor.
+ * @param node1 one node
+ * @param node2 another node
+ * @return the distance between node1 and node2 which is zero if they are the same
+ * or {@link Integer#MAX_VALUE} if node1 or node2 do not belong to the cluster
+ */
+ public int getDistance(Node node1, Node node2) {
+ if (node1 == node2) {
+ return 0;
+ }
+ Node n1 = node1, n2 = node2;
+ int dis = 0;
+ netlock.readLock().lock();
+ try {
+ int level1 = node1.getLevel(), level2 = node2.getLevel();
+ while (n1 != null && level1 > level2) {
+ n1 = n1.getParent();
+ level1--;
+ dis++;
+ }
+ while (n2 != null && level2 > level1) {
+ n2 = n2.getParent();
+ level2--;
+ dis++;
+ }
+ while (n1 != null && n2 != null && n1.getParent() != n2.getParent()) {
+ n1 = n1.getParent();
+ n2 = n2.getParent();
+ dis += 2;
+ }
+ } finally {
+ netlock.readLock().unlock();
+ }
+ if (n1 == null) {
+ LOG.warn("The cluster does not contain node: " + NodeBase.getPath(node1));
+ return Integer.MAX_VALUE;
+ }
+ if (n2 == null) {
+ LOG.warn("The cluster does not contain node: " + NodeBase.getPath(node2));
+ return Integer.MAX_VALUE;
+ }
+ return dis + 2;
+ }
+
+ /** Check if two nodes are on the same rack
+ * @param node1 one node (can be null)
+ * @param node2 another node (can be null)
+ * @return true if node1 and node2 are on the same rack; false otherwise
+ * @exception IllegalArgumentException when either node1 or node2 is null, or
+ * node1 or node2 do not belong to the cluster
+ */
+ public boolean isOnSameRack(Node node1, Node node2) {
+ if (node1 == null || node2 == null) {
+ return false;
+ }
+
+ netlock.readLock().lock();
+ try {
+ return isSameParents(node1, node2);
+ } finally {
+ netlock.readLock().unlock();
+ }
+ }
+
+ /**
+ * Check if network topology is aware of NodeGroup
+ */
+ public boolean isNodeGroupAware() {
+ return false;
+ }
+
+ /**
+ * Return false directly as not aware of NodeGroup, to be override in sub-class
+ */
+ public boolean isOnSameNodeGroup(Node node1, Node node2) {
+ return false;
+ }
+
+ /**
+ * Compare the parents of each node for equality
+ *
+ * <p>To be overridden in subclasses for specific NetworkTopology
+ * implementations, as alternative to overriding the full
+ * {@link #isOnSameRack(Node, Node)} method.
+ *
+ * @param node1 the first node to compare
+ * @param node2 the second node to compare
+ * @return true if their parents are equal, false otherwise
+ *
+ * @see #isOnSameRack(Node, Node)
+ */
+ protected boolean isSameParents(Node node1, Node node2) {
+ return node1.getParent() == node2.getParent();
+ }
+
+ final protected static Random r = new Random();
+
+ /** randomly choose one node from <i>scope</i>
+ * if scope starts with ~, choose one from the all nodes except for the
+ * ones in <i>scope</i>; otherwise, choose one from <i>scope</i>
+ * @param scope range of nodes from which a node will be chosen
+ * @return the chosen node
+ */
+ public Node chooseRandom(String scope) {
+ netlock.readLock().lock();
+ try {
+ if (scope.startsWith("~")) {
+ return chooseRandom(NodeBase.ROOT, scope.substring(1));
+ } else {
+ return chooseRandom(scope, null);
+ }
+ } finally {
+ netlock.readLock().unlock();
+ }
+ }
+
+ private Node chooseRandom(String scope, String excludedScope) {
+ if (excludedScope != null) {
+ if (scope.startsWith(excludedScope)) {
+ return null;
+ }
+ if (!excludedScope.startsWith(scope)) {
+ excludedScope = null;
+ }
+ }
+ Node node = getNode(scope);
+ if (!(node instanceof InnerNode)) {
+ return node;
+ }
+ InnerNode innerNode = (InnerNode) node;
+ int numOfDatanodes = innerNode.getNumOfLeaves();
+ if (excludedScope == null) {
+ node = null;
+ } else {
+ node = getNode(excludedScope);
+ if (!(node instanceof InnerNode)) {
+ numOfDatanodes -= 1;
+ } else {
+ numOfDatanodes -= ((InnerNode) node).getNumOfLeaves();
+ }
+ }
+ int leaveIndex = r.nextInt(numOfDatanodes);
+ return innerNode.getLeaf(leaveIndex, node);
+ }
+
+ /** return leaves in <i>scope</i>
+ * @param scope a path string
+ * @return leaves nodes under specific scope
+ */
+ private Set<Node> doGetLeaves(String scope) {
+ Node node = getNode(scope);
+ Set<Node> leafNodes = new HashSet<Node>();
+ if (!(node instanceof InnerNode)) {
+ leafNodes.add(node);
+ } else {
+ InnerNode innerNode = (InnerNode) node;
+ for (int i = 0; i < innerNode.getNumOfLeaves(); i++) {
+ leafNodes.add(innerNode.getLeaf(i, null));
+ }
+ }
+ return leafNodes;
+ }
+
+ public Set<Node> getLeaves(String scope) {
+ netlock.readLock().lock();
+ try {
+ if (scope.startsWith("~")) {
+ Set<Node> allNodes = doGetLeaves(NodeBase.ROOT);
+ Set<Node> excludeNodes = doGetLeaves(scope.substring(1));
+ allNodes.removeAll(excludeNodes);
+ return allNodes;
+ } else {
+ return doGetLeaves(scope);
+ }
+ } finally {
+ netlock.readLock().unlock();
+ }
+ }
+
+ /** return the number of leaves in <i>scope</i> but not in <i>excludedNodes</i>
+ * if scope starts with ~, return the number of nodes that are not
+ * in <i>scope</i> and <i>excludedNodes</i>;
+ * @param scope a path string that may start with ~
+ * @param excludedNodes a list of nodes
+ * @return number of available nodes
+ */
+ public int countNumOfAvailableNodes(String scope, Collection<Node> excludedNodes) {
+ boolean isExcluded = false;
+ if (scope.startsWith("~")) {
+ isExcluded = true;
+ scope = scope.substring(1);
+ }
+ scope = NodeBase.normalize(scope);
+ int count = 0; // the number of nodes in both scope & excludedNodes
+ netlock.readLock().lock();
+ try {
+ for (Node node : excludedNodes) {
+ if ((NodeBase.getPath(node) + NodeBase.PATH_SEPARATOR_STR).startsWith(scope
+ + NodeBase.PATH_SEPARATOR_STR)) {
+ count++;
+ }
+ }
+ Node n = getNode(scope);
+ int scopeNodeCount = 1;
+ if (n instanceof InnerNode) {
+ scopeNodeCount = ((InnerNode) n).getNumOfLeaves();
+ }
+ if (isExcluded) {
+ return clusterMap.getNumOfLeaves() - scopeNodeCount - excludedNodes.size() + count;
+ } else {
+ return scopeNodeCount - count;
+ }
+ } finally {
+ netlock.readLock().unlock();
+ }
+ }
+
+ /** convert a network tree to a string */
+ @Override
+ public String toString() {
+ // print the number of racks
+ StringBuilder tree = new StringBuilder();
+ tree.append("Number of racks: ");
+ tree.append(numOfRacks);
+ tree.append("\n");
+ // print the number of leaves
+ int numOfLeaves = getNumOfLeaves();
+ tree.append("Expected number of leaves:");
+ tree.append(numOfLeaves);
+ tree.append("\n");
+ // print nodes
+ for (int i = 0; i < numOfLeaves; i++) {
+ tree.append(NodeBase.getPath(clusterMap.getLeaf(i, null)));
+ tree.append("\n");
+ }
+ return tree.toString();
+ }
+
+ /**
+ * Divide networklocation string into two parts by last separator, and get
+ * the first part here.
+ *
+ * @param networkLocation
+ * @return
+ */
+ public static String getFirstHalf(String networkLocation) {
+ int index = networkLocation.lastIndexOf(NodeBase.PATH_SEPARATOR_STR);
+ return networkLocation.substring(0, index);
+ }
+
+ /**
+ * Divide networklocation string into two parts by last separator, and get
+ * the second part here.
+ *
+ * @param networkLocation
+ * @return
+ */
+ public static String getLastHalf(String networkLocation) {
+ int index = networkLocation.lastIndexOf(NodeBase.PATH_SEPARATOR_STR);
+ return networkLocation.substring(index);
+ }
+
+ /** swap two array items */
+ static protected void swap(Node[] nodes, int i, int j) {
+ Node tempNode;
+ tempNode = nodes[j];
+ nodes[j] = nodes[i];
+ nodes[i] = tempNode;
+ }
+
+ /** Sort nodes array by their distances to <i>reader</i>
+ * It linearly scans the array, if a local node is found, swap it with
+ * the first element of the array.
+ * If a local rack node is found, swap it with the first element following
+ * the local node.
+ * If neither local node or local rack node is found, put a random replica
+ * location at position 0.
+ * It leaves the rest nodes untouched.
+ * @param reader the node that wishes to read a block from one of the nodes
+ * @param nodes the list of nodes containing data for the reader
+ */
+ public void pseudoSortByDistance(Node reader, Node[] nodes) {
+ int tempIndex = 0;
+ int localRackNode = -1;
+ if (reader != null) {
+ //scan the array to find the local node & local rack node
+ for (int i = 0; i < nodes.length; i++) {
+ if (tempIndex == 0 && reader == nodes[i]) { //local node
+ //swap the local node and the node at position 0
+ if (i != 0) {
+ swap(nodes, tempIndex, i);
+ }
+ tempIndex = 1;
+ if (localRackNode != -1) {
+ if (localRackNode == 0) {
+ localRackNode = i;
+ }
+ break;
+ }
+ } else if (localRackNode == -1 && isOnSameRack(reader, nodes[i])) {
+ //local rack
+ localRackNode = i;
+ if (tempIndex != 0)
+ break;
+ }
+ }
+
+ // swap the local rack node and the node at position tempIndex
+ if (localRackNode != -1 && localRackNode != tempIndex) {
+ swap(nodes, tempIndex, localRackNode);
+ tempIndex++;
+ }
+ }
+
+ // put a random node at position 0 if it is not a local/local-rack node
+ if (tempIndex == 0 && localRackNode == -1 && nodes.length != 0) {
+ swap(nodes, 0, r.nextInt(nodes.length));
+ }
+ }
+
+}
Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/Node.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/Node.java?rev=1492274&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/Node.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/Node.java Wed Jun 12 16:41:47 2013
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.net;
+
+import com.google.common.annotations.Beta;
+
+/** The interface defines a node in a network topology.
+ * A node may be a leave representing a data node or an inner
+ * node representing a datacenter or rack.
+ * Each data has a name and its location in the network is
+ * decided by a string with syntax similar to a file name.
+ * For example, a data node's name is hostname:port# and if it's located at
+ * rack "orange" in datacenter "dog", the string representation of its
+ * network location is /dog/orange
+ */
+@Beta
+public interface Node {
+ /** @return the string representation of this node's network location */
+ public String getNetworkLocation();
+
+ /** Set this node's network location
+ * @param location the location
+ */
+ public void setNetworkLocation(String location);
+
+ /** @return this node's name */
+ public String getName();
+
+ /** @return this node's parent */
+ public Node getParent();
+
+ /** Set this node's parent
+ * @param parent the parent
+ */
+ public void setParent(Node parent);
+
+ /** @return this node's level in the tree.
+ * E.g. the root of a tree returns 0 and its children return 1
+ */
+ public int getLevel();
+
+ /** Set this node's level in the tree
+ * @param i the level
+ */
+ public void setLevel(int i);
+}
Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/NodeBase.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/NodeBase.java?rev=1492274&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/NodeBase.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/NodeBase.java Wed Jun 12 16:41:47 2013
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.net;
+
+/** A base class that implements interface Node
+ *
+ */
+public class NodeBase implements Node {
+ /** Path separator {@value} */
+ public final static char PATH_SEPARATOR = '/';
+ /** Path separator as a string {@value} */
+ public final static String PATH_SEPARATOR_STR = "/";
+ /** string representation of root {@value} */
+ public final static String ROOT = "";
+
+ protected String name; //host:port#
+ protected String location; //string representation of this node's location
+ protected int level; //which level of the tree the node resides
+ protected Node parent; //its parent
+
+ /** Default constructor */
+ public NodeBase() {
+ }
+
+ /** Construct a node from its path
+ * @param path
+ * a concatenation of this node's location, the path seperator, and its name
+ */
+ public NodeBase(String path) {
+ path = normalize(path);
+ int index = path.lastIndexOf(PATH_SEPARATOR);
+ if (index == -1) {
+ set(ROOT, path);
+ } else {
+ set(path.substring(index + 1), path.substring(0, index));
+ }
+ }
+
+ /** Construct a node from its name and its location
+ * @param name this node's name (can be null, must not contain {@link #PATH_SEPARATOR})
+ * @param location this node's location
+ */
+ public NodeBase(String name, String location) {
+ set(name, normalize(location));
+ }
+
+ /** Construct a node from its name and its location
+ * @param name this node's name (can be null, must not contain {@link #PATH_SEPARATOR})
+ * @param location this node's location
+ * @param parent this node's parent node
+ * @param level this node's level in the tree
+ */
+ public NodeBase(String name, String location, Node parent, int level) {
+ set(name, normalize(location));
+ this.parent = parent;
+ this.level = level;
+ }
+
+ /**
+ * set this node's name and location
+ * @param name the (nullable) name -which cannot contain the {@link #PATH_SEPARATOR}
+ * @param location the location
+ */
+ private void set(String name, String location) {
+ if (name != null && name.contains(PATH_SEPARATOR_STR))
+ throw new IllegalArgumentException("Network location name contains /: " + name);
+ this.name = (name == null) ? "" : name;
+ this.location = location;
+ }
+
+ /** @return this node's name */
+ @Override
+ public String getName() {
+ return name;
+ }
+
+ /** @return this node's network location */
+ @Override
+ public String getNetworkLocation() {
+ return location;
+ }
+
+ /** Set this node's network location
+ * @param location the location
+ */
+ @Override
+ public void setNetworkLocation(String location) {
+ this.location = location;
+ }
+
+ /**
+ * Get the path of a node
+ * @param node a non-null node
+ * @return the path of a node
+ */
+ public static String getPath(Node node) {
+ return node.getNetworkLocation() + PATH_SEPARATOR_STR + node.getName();
+ }
+
+ /** @return this node's path as its string representation */
+ @Override
+ public String toString() {
+ return getPath(this);
+ }
+
+ /** Normalize a path by stripping off any trailing {@link #PATH_SEPARATOR}
+ * @param path path to normalize.
+ * @return the normalised path
+ * If <i>path</i>is null or empty {@link #ROOT} is returned
+ * @throws IllegalArgumentException if the first character of a non empty path
+ * is not {@link #PATH_SEPARATOR}
+ */
+ public static String normalize(String path) {
+ if (path == null || path.length() == 0)
+ return ROOT;
+
+ if (path.charAt(0) != PATH_SEPARATOR) {
+ throw new IllegalArgumentException("Network Location path does not start with " + PATH_SEPARATOR_STR + ": "
+ + path);
+ }
+
+ int len = path.length();
+ if (path.charAt(len - 1) == PATH_SEPARATOR) {
+ return path.substring(0, len - 1);
+ }
+ return path;
+ }
+
+ /** @return this node's parent */
+ @Override
+ public Node getParent() {
+ return parent;
+ }
+
+ /** Set this node's parent
+ * @param parent the parent
+ */
+ @Override
+ public void setParent(Node parent) {
+ this.parent = parent;
+ }
+
+ /** @return this node's level in the tree.
+ * E.g. the root of a tree returns 0 and its children return 1
+ */
+ @Override
+ public int getLevel() {
+ return level;
+ }
+
+ /** Set this node's level in the tree
+ * @param level the level
+ */
+ @Override
+ public void setLevel(int level) {
+ this.level = level;
+ }
+
+ public static int locationToDepth(String location) {
+ String normalizedLocation = normalize(location);
+ int length = normalizedLocation.length();
+ int depth = 0;
+ for (int i = 0; i < length; i++) {
+ if (normalizedLocation.charAt(i) == PATH_SEPARATOR) {
+ depth++;
+ }
+ }
+ return depth;
+ }
+}
Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/ScriptBasedMapping.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/ScriptBasedMapping.java?rev=1492274&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/ScriptBasedMapping.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/ScriptBasedMapping.java Wed Jun 12 16:41:47 2013
@@ -0,0 +1,264 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.net;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.StringTokenizer;
+
+import org.apache.bookkeeper.util.Shell.ShellCommandExecutor;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * This class implements the {@link DNSToSwitchMapping} interface using a
+ * script configured via the
+ * {@link CommonConfigurationKeys#NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY} option.
+ * <p/>
+ * It contains a static class <code>RawScriptBasedMapping</code> that performs
+ * the work: reading the configuration parameters, executing any defined
+ * script, handling errors and such like. The outer
+ * class extends {@link CachedDNSToSwitchMapping} to cache the delegated
+ * queries.
+ * <p/>
+ * This DNS mapper's {@link #isSingleSwitch()} predicate returns
+ * true if and only if a script is defined.
+ */
+public final class ScriptBasedMapping extends CachedDNSToSwitchMapping {
+
+ /**
+ * Minimum number of arguments: {@value}
+ */
+ static final int MIN_ALLOWABLE_ARGS = 1;
+
+ /**
+ * Default number of arguments: {@value}
+ */
+ static final int DEFAULT_ARG_COUNT = CommonConfigurationKeys.NET_TOPOLOGY_SCRIPT_NUMBER_ARGS_DEFAULT;
+
+ /**
+ * key to the script filename {@value}
+ */
+ static final String SCRIPT_FILENAME_KEY = CommonConfigurationKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY;
+ /**
+ * key to the argument count that the script supports
+ * {@value}
+ */
+ static final String SCRIPT_ARG_COUNT_KEY = CommonConfigurationKeys.NET_TOPOLOGY_SCRIPT_NUMBER_ARGS_KEY;
+ /**
+ * Text used in the {@link #toString()} method if there is no string
+ * {@value}
+ */
+ public static final String NO_SCRIPT = "no script";
+
+ /**
+ * Create an instance with the default configuration.
+ * </p>
+ * Calling {@link #setConf(Configuration)} will trigger a
+ * re-evaluation of the configuration settings and so be used to
+ * set up the mapping script.
+ *
+ */
+ public ScriptBasedMapping() {
+ super(new RawScriptBasedMapping());
+ }
+
+ /**
+ * Create an instance from the given configuration
+ * @param conf configuration
+ */
+ public ScriptBasedMapping(Configuration conf) {
+ this();
+ setConf(conf);
+ }
+
+ /**
+ * Get the cached mapping and convert it to its real type
+ * @return the inner raw script mapping.
+ */
+ private RawScriptBasedMapping getRawMapping() {
+ return (RawScriptBasedMapping) rawMapping;
+ }
+
+ @Override
+ public Configuration getConf() {
+ return getRawMapping().getConf();
+ }
+
+ @Override
+ public String toString() {
+ return "script-based mapping with " + getRawMapping().toString();
+ }
+
+ /**
+ * {@inheritDoc}
+ * <p/>
+ * This will get called in the superclass constructor, so a check is needed
+ * to ensure that the raw mapping is defined before trying to relaying a null
+ * configuration.
+ * @param conf
+ */
+ @Override
+ public void setConf(Configuration conf) {
+ super.setConf(conf);
+ getRawMapping().setConf(conf);
+ }
+
+ /**
+ * This is the uncached script mapping that is fed into the cache managed
+ * by the superclass {@link CachedDNSToSwitchMapping}
+ */
+ private static final class RawScriptBasedMapping extends AbstractDNSToSwitchMapping {
+ private String scriptName;
+ private int maxArgs; //max hostnames per call of the script
+ private static final Log LOG = LogFactory.getLog(ScriptBasedMapping.class);
+
+ /**
+ * Set the configuration and extract the configuration parameters of interest
+ * @param conf the new configuration
+ */
+ @Override
+ public void setConf(Configuration conf) {
+ super.setConf(conf);
+ if (conf != null) {
+ scriptName = conf.getString(SCRIPT_FILENAME_KEY);
+ maxArgs = conf.getInt(SCRIPT_ARG_COUNT_KEY, DEFAULT_ARG_COUNT);
+ } else {
+ scriptName = null;
+ maxArgs = 0;
+ }
+ }
+
+ /**
+ * Constructor. The mapping is not ready to use until
+ * {@link #setConf(Configuration)} has been called
+ */
+ public RawScriptBasedMapping() {
+ }
+
+ @Override
+ public List<String> resolve(List<String> names) {
+ List<String> m = new ArrayList<String>(names.size());
+
+ if (names.isEmpty()) {
+ return m;
+ }
+
+ if (scriptName == null) {
+ for (int i = 0; i < names.size(); i++) {
+ m.add(NetworkTopology.DEFAULT_RACK);
+ }
+ return m;
+ }
+
+ String output = runResolveCommand(names);
+ if (output != null) {
+ StringTokenizer allSwitchInfo = new StringTokenizer(output);
+ while (allSwitchInfo.hasMoreTokens()) {
+ String switchInfo = allSwitchInfo.nextToken();
+ m.add(switchInfo);
+ }
+
+ if (m.size() != names.size()) {
+ // invalid number of entries returned by the script
+ LOG.error("Script " + scriptName + " returned " + Integer.toString(m.size()) + " values when "
+ + Integer.toString(names.size()) + " were expected.");
+ return null;
+ }
+ } else {
+ // an error occurred. return null to signify this.
+ // (exn was already logged in runResolveCommand)
+ return null;
+ }
+
+ return m;
+ }
+
+ /**
+ * Build and execute the resolution command. The command is
+ * executed in the directory specified by the system property
+ * "user.dir" if set; otherwise the current working directory is used
+ * @param args a list of arguments
+ * @return null if the number of arguments is out of range,
+ * or the output of the command.
+ */
+ private String runResolveCommand(List<String> args) {
+ int loopCount = 0;
+ if (args.size() == 0) {
+ return null;
+ }
+ StringBuilder allOutput = new StringBuilder();
+ int numProcessed = 0;
+ if (maxArgs < MIN_ALLOWABLE_ARGS) {
+ LOG.warn("Invalid value " + Integer.toString(maxArgs) + " for " + SCRIPT_ARG_COUNT_KEY
+ + "; must be >= " + Integer.toString(MIN_ALLOWABLE_ARGS));
+ return null;
+ }
+
+ while (numProcessed != args.size()) {
+ int start = maxArgs * loopCount;
+ List<String> cmdList = new ArrayList<String>();
+ cmdList.add(scriptName);
+ for (numProcessed = start;
+ numProcessed < (start + maxArgs) && numProcessed < args.size();
+ numProcessed++) {
+ cmdList.add(args.get(numProcessed));
+ }
+ File dir = null;
+ String userDir;
+ if ((userDir = System.getProperty("user.dir")) != null) {
+ dir = new File(userDir);
+ }
+ ShellCommandExecutor s = new ShellCommandExecutor(cmdList.toArray(new String[cmdList.size()]), dir);
+ try {
+ s.execute();
+ allOutput.append(s.getOutput()).append(" ");
+ } catch (Exception e) {
+ LOG.warn("Exception running " + s, e);
+ return null;
+ }
+ loopCount++;
+ }
+ return allOutput.toString();
+ }
+
+ /**
+ * Declare that the mapper is single-switched if a script was not named
+ * in the configuration.
+ * @return true iff there is no script
+ */
+ @Override
+ public boolean isSingleSwitch() {
+ return scriptName == null;
+ }
+
+ @Override
+ public String toString() {
+ return scriptName != null ? ("script " + scriptName) : NO_SCRIPT;
+ }
+
+ @Override
+ public void reloadCachedMappings() {
+ // Nothing to do here, since RawScriptBasedMapping has no cache, and
+ // does not inherit from CachedDNSToSwitchMapping
+ }
+ }
+}
Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/Shell.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/Shell.java?rev=1492274&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/Shell.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/Shell.java Wed Jun 12 16:41:47 2013
@@ -0,0 +1,677 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.util;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import com.google.common.base.Charsets;
+
+/**
+ * A base class for running a Unix command.
+ *
+ * <code>Shell</code> can be used to run unix commands like <code>du</code> or
+ * <code>df</code>. It also offers facilities to gate commands by
+ * time-intervals.
+ */
+abstract public class Shell {
+
+ public static final Log LOG = LogFactory.getLog(Shell.class);
+
+ private static boolean IS_JAVA7_OR_ABOVE =
+ System.getProperty("java.version").substring(0, 3).compareTo("1.7") >= 0;
+
+ public static boolean isJava7OrAbove() {
+ return IS_JAVA7_OR_ABOVE;
+ }
+
+ /** a Unix command to get the current user's name */
+ public final static String USER_NAME_COMMAND = "whoami";
+
+ /** Windows CreateProcess synchronization object */
+ public static final Object WindowsProcessLaunchLock = new Object();
+
+ /** a Unix command to get the current user's groups list */
+ public static String[] getGroupsCommand() {
+ return (WINDOWS) ? new String[] { "cmd", "/c", "groups" } : new String[] { "bash", "-c", "groups" };
+ }
+
+ /** a Unix command to get a given user's groups list */
+ public static String[] getGroupsForUserCommand(final String user) {
+ //'groups username' command return is non-consistent across different unixes
+ return (WINDOWS) ? new String[] { WINUTILS, "groups", "-F", "\"" + user + "\"" } : new String[] { "bash", "-c",
+ "id -Gn " + user };
+ }
+
+ /** a Unix command to get a given netgroup's user list */
+ public static String[] getUsersForNetgroupCommand(final String netgroup) {
+ //'groups username' command return is non-consistent across different unixes
+ return (WINDOWS) ? new String[] { "cmd", "/c", "getent netgroup " + netgroup } : new String[] { "bash", "-c",
+ "getent netgroup " + netgroup };
+ }
+
+ /** Return a command to get permission information. */
+ public static String[] getGetPermissionCommand() {
+ return (WINDOWS) ? new String[] { WINUTILS, "ls", "-F" } : new String[] { "/bin/ls", "-ld" };
+ }
+
+ /** Return a command to set permission */
+ public static String[] getSetPermissionCommand(String perm, boolean recursive) {
+ if (recursive) {
+ return (WINDOWS) ? new String[] { WINUTILS, "chmod", "-R", perm } : new String[] { "chmod", "-R", perm };
+ } else {
+ return (WINDOWS) ? new String[] { WINUTILS, "chmod", perm } : new String[] { "chmod", perm };
+ }
+ }
+
+ /**
+ * Return a command to set permission for specific file.
+ *
+ * @param perm String permission to set
+ * @param recursive boolean true to apply to all sub-directories recursively
+ * @param file String file to set
+ * @return String[] containing command and arguments
+ */
+ public static String[] getSetPermissionCommand(String perm, boolean recursive, String file) {
+ String[] baseCmd = getSetPermissionCommand(perm, recursive);
+ String[] cmdWithFile = Arrays.copyOf(baseCmd, baseCmd.length + 1);
+ cmdWithFile[cmdWithFile.length - 1] = file;
+ return cmdWithFile;
+ }
+
+ /** Return a command to set owner */
+ public static String[] getSetOwnerCommand(String owner) {
+ return (WINDOWS) ? new String[] { WINUTILS, "chown", "\"" + owner + "\"" } : new String[] { "chown", owner };
+ }
+
+ /** Return a command to create symbolic links */
+ public static String[] getSymlinkCommand(String target, String link) {
+ return WINDOWS ? new String[] { WINUTILS, "symlink", link, target } : new String[] { "ln", "-s", target, link };
+ }
+
+ /** Return a command for determining if process with specified pid is alive. */
+ public static String[] getCheckProcessIsAliveCommand(String pid) {
+ return Shell.WINDOWS ? new String[] { Shell.WINUTILS, "task", "isAlive", pid } : new String[] { "kill", "-0",
+ isSetsidAvailable ? "-" + pid : pid };
+ }
+
+ /** Return a command to send a signal to a given pid */
+ public static String[] getSignalKillCommand(int code, String pid) {
+ return Shell.WINDOWS ? new String[] { Shell.WINUTILS, "task", "kill", pid } : new String[] { "kill",
+ "-" + code, isSetsidAvailable ? "-" + pid : pid };
+ }
+
+ /**
+ * Returns a File referencing a script with the given basename, inside the
+ * given parent directory. The file extension is inferred by platform: ".cmd"
+ * on Windows, or ".sh" otherwise.
+ *
+ * @param parent File parent directory
+ * @param basename String script file basename
+ * @return File referencing the script in the directory
+ */
+ public static File appendScriptExtension(File parent, String basename) {
+ return new File(parent, appendScriptExtension(basename));
+ }
+
+ /**
+ * Returns a script file name with the given basename. The file extension is
+ * inferred by platform: ".cmd" on Windows, or ".sh" otherwise.
+ *
+ * @param basename String script file basename
+ * @return String script file name
+ */
+ public static String appendScriptExtension(String basename) {
+ return basename + (WINDOWS ? ".cmd" : ".sh");
+ }
+
+ /**
+ * Returns a command to run the given script. The script interpreter is
+ * inferred by platform: cmd on Windows or bash otherwise.
+ *
+ * @param script File script to run
+ * @return String[] command to run the script
+ */
+ public static String[] getRunScriptCommand(File script) {
+ String absolutePath = script.getAbsolutePath();
+ return WINDOWS ? new String[] { "cmd", "/c", absolutePath } : new String[] { "/bin/bash", absolutePath };
+ }
+
+ /** a Unix command to set permission */
+ public static final String SET_PERMISSION_COMMAND = "chmod";
+ /** a Unix command to set owner */
+ public static final String SET_OWNER_COMMAND = "chown";
+
+ /** a Unix command to set the change user's groups list */
+ public static final String SET_GROUP_COMMAND = "chgrp";
+ /** a Unix command to create a link */
+ public static final String LINK_COMMAND = "ln";
+ /** a Unix command to get a link target */
+ public static final String READ_LINK_COMMAND = "readlink";
+
+ /**Time after which the executing script would be timedout*/
+ protected long timeOutInterval = 0L;
+ /** If or not script timed out*/
+ private AtomicBoolean timedOut;
+
+ /** Centralized logic to discover and validate the sanity of the Hadoop
+ * home directory. Returns either NULL or a directory that exists and
+ * was specified via either -Dhadoop.home.dir or the HADOOP_HOME ENV
+ * variable. This does a lot of work so it should only be called
+ * privately for initialization once per process.
+ **/
+ private static String checkHadoopHome() {
+
+ // first check the Dflag hadoop.home.dir with JVM scope
+ String home = System.getProperty("hadoop.home.dir");
+
+ // fall back to the system/user-global env variable
+ if (home == null) {
+ home = System.getenv("HADOOP_HOME");
+ }
+
+ try {
+ // couldn't find either setting for hadoop's home directory
+ if (home == null) {
+ throw new IOException("HADOOP_HOME or hadoop.home.dir are not set.");
+ }
+
+ if (home.startsWith("\"") && home.endsWith("\"")) {
+ home = home.substring(1, home.length() - 1);
+ }
+
+ // check that the home setting is actually a directory that exists
+ File homedir = new File(home);
+ if (!homedir.isAbsolute() || !homedir.exists() || !homedir.isDirectory()) {
+ throw new IOException("Hadoop home directory " + homedir
+ + " does not exist, is not a directory, or is not an absolute path.");
+ }
+
+ home = homedir.getCanonicalPath();
+
+ } catch (IOException ioe) {
+ LOG.error("Failed to detect a valid hadoop home directory", ioe);
+ home = null;
+ }
+
+ return home;
+ }
+
+ private static String HADOOP_HOME_DIR = checkHadoopHome();
+
+ // Public getter, throws an exception if HADOOP_HOME failed validation
+ // checks and is being referenced downstream.
+ public static final String getHadoopHome() throws IOException {
+ if (HADOOP_HOME_DIR == null) {
+ throw new IOException("Misconfigured HADOOP_HOME cannot be referenced.");
+ }
+
+ return HADOOP_HOME_DIR;
+ }
+
+ /** fully qualify the path to a binary that should be in a known hadoop
+ * bin location. This is primarily useful for disambiguating call-outs
+ * to executable sub-components of Hadoop to avoid clashes with other
+ * executables that may be in the path. Caveat: this call doesn't
+ * just format the path to the bin directory. It also checks for file
+ * existence of the composed path. The output of this call should be
+ * cached by callers.
+ * */
+ public static final String getQualifiedBinPath(String executable) throws IOException {
+ // construct hadoop bin path to the specified executable
+ String fullExeName = HADOOP_HOME_DIR + File.separator + "bin" + File.separator + executable;
+
+ File exeFile = new File(fullExeName);
+ if (!exeFile.exists()) {
+ throw new IOException("Could not locate executable " + fullExeName + " in the Hadoop binaries.");
+ }
+
+ return exeFile.getCanonicalPath();
+ }
+
+ /** Set to true on Windows platforms */
+ public static final boolean WINDOWS /* borrowed from Path.WINDOWS */
+ = System.getProperty("os.name").startsWith("Windows");
+
+ public static final boolean LINUX = System.getProperty("os.name").startsWith("Linux");
+
+ /** a Windows utility to emulate Unix commands */
+ public static final String WINUTILS = getWinUtilsPath();
+
+ public static final String getWinUtilsPath() {
+ String winUtilsPath = null;
+
+ try {
+ if (WINDOWS) {
+ winUtilsPath = getQualifiedBinPath("winutils.exe");
+ }
+ } catch (IOException ioe) {
+ LOG.error("Failed to locate the winutils binary in the hadoop binary path", ioe);
+ }
+
+ return winUtilsPath;
+ }
+
+ public static final boolean isSetsidAvailable = isSetsidSupported();
+
+ private static boolean isSetsidSupported() {
+ if (Shell.WINDOWS) {
+ return false;
+ }
+ ShellCommandExecutor shexec = null;
+ boolean setsidSupported = true;
+ try {
+ String[] args = { "setsid", "bash", "-c", "echo $$" };
+ shexec = new ShellCommandExecutor(args);
+ shexec.execute();
+ } catch (IOException ioe) {
+ LOG.warn("setsid is not available on this machine. So not using it.");
+ setsidSupported = false;
+ } finally { // handle the exit code
+ if (null != shexec) {
+ LOG.info("setsid exited with exit code " + shexec.getExitCode());
+ }
+ }
+ return setsidSupported;
+ }
+
+ /** Token separator regex used to parse Shell tool outputs */
+ public static final String TOKEN_SEPARATOR_REGEX = WINDOWS ? "[|\n\r]" : "[ \t\n\r\f]";
+
+ private long interval; // refresh interval in msec
+ private long lastTime; // last time the command was performed
+ private Map<String, String> environment; // env for the command execution
+ private File dir;
+ private Process process; // sub process used to execute the command
+ private int exitCode;
+
+ /**If or not script finished executing*/
+ private volatile AtomicBoolean completed;
+
+ public Shell() {
+ this(0L);
+ }
+
+ /**
+ * @param interval the minimum duration to wait before re-executing the
+ * command.
+ */
+ public Shell(long interval) {
+ this.interval = interval;
+ this.lastTime = (interval < 0) ? 0 : -interval;
+ }
+
+ /** set the environment for the command
+ * @param env Mapping of environment variables
+ */
+ protected void setEnvironment(Map<String, String> env) {
+ this.environment = env;
+ }
+
+ /** set the working directory
+ * @param dir The directory where the command would be executed
+ */
+ protected void setWorkingDirectory(File dir) {
+ this.dir = dir;
+ }
+
+ /** check to see if a command needs to be executed and execute if needed */
+ protected void run() throws IOException {
+ if (lastTime + interval > MathUtils.now())
+ return;
+ exitCode = 0; // reset for next run
+ runCommand();
+ }
+
+ /** Run a command */
+ private void runCommand() throws IOException {
+ ProcessBuilder builder = new ProcessBuilder(getExecString());
+ Timer timeOutTimer = null;
+ ShellTimeoutTimerTask timeoutTimerTask = null;
+ timedOut = new AtomicBoolean(false);
+ completed = new AtomicBoolean(false);
+
+ if (environment != null) {
+ builder.environment().putAll(this.environment);
+ }
+ if (dir != null) {
+ builder.directory(this.dir);
+ }
+
+ if (Shell.WINDOWS) {
+ synchronized (WindowsProcessLaunchLock) {
+ // To workaround the race condition issue with child processes
+ // inheriting unintended handles during process launch that can
+ // lead to hangs on reading output and error streams, we
+ // serialize process creation. More info available at:
+ // http://support.microsoft.com/kb/315939
+ process = builder.start();
+ }
+ } else {
+ process = builder.start();
+ }
+
+ if (timeOutInterval > 0) {
+ timeOutTimer = new Timer("Shell command timeout");
+ timeoutTimerTask = new ShellTimeoutTimerTask(this);
+ //One time scheduling.
+ timeOutTimer.schedule(timeoutTimerTask, timeOutInterval);
+ }
+ final BufferedReader errReader = new BufferedReader(new InputStreamReader(process.getErrorStream(),
+ Charsets.UTF_8));
+ BufferedReader inReader = new BufferedReader(new InputStreamReader(process.getInputStream(), Charsets.UTF_8));
+ final StringBuffer errMsg = new StringBuffer();
+
+ // read error and input streams as this would free up the buffers
+ // free the error stream buffer
+ Thread errThread = new Thread() {
+ @Override
+ public void run() {
+ try {
+ String line = errReader.readLine();
+ while ((line != null) && !isInterrupted()) {
+ errMsg.append(line);
+ errMsg.append(System.getProperty("line.separator"));
+ line = errReader.readLine();
+ }
+ } catch (IOException ioe) {
+ LOG.warn("Error reading the error stream", ioe);
+ }
+ }
+ };
+ try {
+ errThread.start();
+ } catch (IllegalStateException ise) {
+ }
+ try {
+ parseExecResult(inReader); // parse the output
+ // clear the input stream buffer
+ String line = inReader.readLine();
+ while (line != null) {
+ line = inReader.readLine();
+ }
+ // wait for the process to finish and check the exit code
+ exitCode = process.waitFor();
+ try {
+ // make sure that the error thread exits
+ errThread.join();
+ } catch (InterruptedException ie) {
+ LOG.warn("Interrupted while reading the error stream", ie);
+ }
+ completed.set(true);
+ //the timeout thread handling
+ //taken care in finally block
+ if (exitCode != 0) {
+ throw new ExitCodeException(exitCode, errMsg.toString());
+ }
+ } catch (InterruptedException ie) {
+ throw new IOException(ie.toString());
+ } finally {
+ if (timeOutTimer != null) {
+ timeOutTimer.cancel();
+ }
+ // close the input stream
+ try {
+ inReader.close();
+ } catch (IOException ioe) {
+ LOG.warn("Error while closing the input stream", ioe);
+ }
+ if (!completed.get()) {
+ errThread.interrupt();
+ }
+ try {
+ errReader.close();
+ } catch (IOException ioe) {
+ LOG.warn("Error while closing the error stream", ioe);
+ }
+ process.destroy();
+ lastTime = MathUtils.now();
+ }
+ }
+
+ /** return an array containing the command name & its parameters */
+ protected abstract String[] getExecString();
+
+ /** Parse the execution result */
+ protected abstract void parseExecResult(BufferedReader lines) throws IOException;
+
+ /** get the current sub-process executing the given command
+ * @return process executing the command
+ */
+ public Process getProcess() {
+ return process;
+ }
+
+ /** get the exit code
+ * @return the exit code of the process
+ */
+ public int getExitCode() {
+ return exitCode;
+ }
+
+ /**
+ * This is an IOException with exit code added.
+ */
+ public static class ExitCodeException extends IOException {
+ private static final long serialVersionUID = 2241095121609500810L;
+
+ int exitCode;
+
+ public ExitCodeException(int exitCode, String message) {
+ super(message);
+ this.exitCode = exitCode;
+ }
+
+ public int getExitCode() {
+ return exitCode;
+ }
+ }
+
+ /**
+ * A simple shell command executor.
+ *
+ * <code>ShellCommandExecutor</code>should be used in cases where the output
+ * of the command needs no explicit parsing and where the command, working
+ * directory and the environment remains unchanged. The output of the command
+ * is stored as-is and is expected to be small.
+ */
+ public static class ShellCommandExecutor extends Shell {
+
+ private String[] command;
+ private StringBuffer output;
+
+ public ShellCommandExecutor(String[] execString) {
+ this(execString, null);
+ }
+
+ public ShellCommandExecutor(String[] execString, File dir) {
+ this(execString, dir, null);
+ }
+
+ public ShellCommandExecutor(String[] execString, File dir, Map<String, String> env) {
+ this(execString, dir, env, 0L);
+ }
+
+ /**
+ * Create a new instance of the ShellCommandExecutor to execute a command.
+ *
+ * @param execString The command to execute with arguments
+ * @param dir If not-null, specifies the directory which should be set
+ * as the current working directory for the command.
+ * If null, the current working directory is not modified.
+ * @param env If not-null, environment of the command will include the
+ * key-value pairs specified in the map. If null, the current
+ * environment is not modified.
+ * @param timeout Specifies the time in milliseconds, after which the
+ * command will be killed and the status marked as timedout.
+ * If 0, the command will not be timed out.
+ */
+ public ShellCommandExecutor(String[] execString, File dir, Map<String, String> env, long timeout) {
+ command = execString.clone();
+ if (dir != null) {
+ setWorkingDirectory(dir);
+ }
+ if (env != null) {
+ setEnvironment(env);
+ }
+ timeOutInterval = timeout;
+ }
+
+ /** Execute the shell command. */
+ public void execute() throws IOException {
+ this.run();
+ }
+
+ @Override
+ public String[] getExecString() {
+ return command.clone();
+ }
+
+ @Override
+ protected void parseExecResult(BufferedReader lines) throws IOException {
+ output = new StringBuffer();
+ char[] buf = new char[512];
+ int nRead;
+ while ((nRead = lines.read(buf, 0, buf.length)) > 0) {
+ output.append(buf, 0, nRead);
+ }
+ }
+
+ /** Get the output of the shell command.*/
+ public String getOutput() {
+ return (output == null) ? "" : output.toString();
+ }
+
+ /**
+ * Returns the commands of this instance.
+ * Arguments with spaces in are presented with quotes round; other
+ * arguments are presented raw
+ *
+ * @return a string representation of the object.
+ */
+ @Override
+ public String toString() {
+ StringBuilder builder = new StringBuilder();
+ String[] args = getExecString();
+ for (String s : args) {
+ if (s.indexOf(' ') >= 0) {
+ builder.append('"').append(s).append('"');
+ } else {
+ builder.append(s);
+ }
+ builder.append(' ');
+ }
+ return builder.toString();
+ }
+ }
+
+ /**
+ * To check if the passed script to shell command executor timed out or
+ * not.
+ *
+ * @return if the script timed out.
+ */
+ public boolean isTimedOut() {
+ return timedOut.get();
+ }
+
+ /**
+ * Set if the command has timed out.
+ *
+ */
+ private void setTimedOut() {
+ this.timedOut.set(true);
+ }
+
+ /**
+ * Static method to execute a shell command.
+ * Covers most of the simple cases without requiring the user to implement
+ * the <code>Shell</code> interface.
+ * @param cmd shell command to execute.
+ * @return the output of the executed command.
+ */
+ public static String execCommand(String... cmd) throws IOException {
+ return execCommand(null, cmd, 0L);
+ }
+
+ /**
+ * Static method to execute a shell command.
+ * Covers most of the simple cases without requiring the user to implement
+ * the <code>Shell</code> interface.
+ * @param env the map of environment key=value
+ * @param cmd shell command to execute.
+ * @param timeout time in milliseconds after which script should be marked timeout
+ * @return the output of the executed command.o
+ */
+
+ public static String execCommand(Map<String, String> env, String[] cmd, long timeout) throws IOException {
+ ShellCommandExecutor exec = new ShellCommandExecutor(cmd, null, env, timeout);
+ exec.execute();
+ return exec.getOutput();
+ }
+
+ /**
+ * Static method to execute a shell command.
+ * Covers most of the simple cases without requiring the user to implement
+ * the <code>Shell</code> interface.
+ * @param env the map of environment key=value
+ * @param cmd shell command to execute.
+ * @return the output of the executed command.
+ */
+ public static String execCommand(Map<String, String> env, String... cmd) throws IOException {
+ return execCommand(env, cmd, 0L);
+ }
+
+ /**
+ * Timer which is used to timeout scripts spawned off by shell.
+ */
+ private static class ShellTimeoutTimerTask extends TimerTask {
+
+ private Shell shell;
+
+ public ShellTimeoutTimerTask(Shell shell) {
+ this.shell = shell;
+ }
+
+ @Override
+ public void run() {
+ Process p = shell.getProcess();
+ try {
+ p.exitValue();
+ } catch (Exception e) {
+ //Process has not terminated.
+ //So check if it has completed
+ //if not just destroy it.
+ if (p != null && !shell.completed.get()) {
+ shell.setTimedOut();
+ p.destroy();
+ }
+ }
+ }
+ }
+}
Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecoveryTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecoveryTest.java?rev=1492274&r1=1492273&r2=1492274&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecoveryTest.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecoveryTest.java Wed Jun 12 16:41:47 2013
@@ -279,7 +279,7 @@ public class LedgerRecoveryTest extends
public void openComplete(int rc, LedgerHandle lh, Object ctx) {
returnCode.set(rc);
openLatch.countDown();
- if (rc != BKException.Code.OK) {
+ if (rc == BKException.Code.OK) {
try {
lh.close();
} catch (Exception e) {