You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2013/06/12 18:41:48 UTC

svn commit: r1492274 [2/3] - in /zookeeper/bookkeeper/trunk: ./ bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ bookkeeper-server/src/main/java/org/apache/bookkeeper/net/ bookkee...

Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/NetworkTopology.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/NetworkTopology.java?rev=1492274&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/NetworkTopology.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/NetworkTopology.java Wed Jun 12 16:41:47 2013
@@ -0,0 +1,876 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.net;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The class represents a cluster of computer with a tree hierarchical
+ * network topology.
+ * For example, a cluster may be consists of many data centers filled
+ * with racks of computers.
+ * In a network topology, leaves represent data nodes (computers) and inner
+ * nodes represent switches/routers that manage traffic in/out of data centers
+ * or racks.
+ *
+ */
+public class NetworkTopology {
+
+    public final static String DEFAULT_RACK = "/default-rack";
+    public final static int DEFAULT_HOST_LEVEL = 2;
+    public static final Logger LOG = LoggerFactory.getLogger(NetworkTopology.class);
+
+    public static class InvalidTopologyException extends RuntimeException {
+        private static final long serialVersionUID = 1L;
+
+        public InvalidTopologyException(String msg) {
+            super(msg);
+        }
+    }
+
+    /** InnerNode represents a switch/router of a data center or rack.
+     * Different from a leaf node, it has non-null children.
+     */
+    static class InnerNode extends NodeBase {
+        protected List<Node> children = new ArrayList<Node>();
+        private int numOfLeaves;
+
+        /** Construct an InnerNode from a path-like string */
+        InnerNode(String path) {
+            super(path);
+        }
+
+        /** Construct an InnerNode from its name and its network location */
+        InnerNode(String name, String location) {
+            super(name, location);
+        }
+
+        /** Construct an InnerNode
+         * from its name, its network location, its parent, and its level */
+        InnerNode(String name, String location, InnerNode parent, int level) {
+            super(name, location, parent, level);
+        }
+
+        /** @return its children */
+        List<Node> getChildren() {
+            return children;
+        }
+
+        /** @return the number of children this node has */
+        int getNumOfChildren() {
+            return children.size();
+        }
+
+        /** Judge if this node represents a rack
+         * @return true if it has no child or its children are not InnerNodes
+         */
+        boolean isRack() {
+            if (children.isEmpty()) {
+                return true;
+            }
+
+            Node firstChild = children.get(0);
+            if (firstChild instanceof InnerNode) {
+                return false;
+            }
+
+            return true;
+        }
+
+        /** Judge if this node is an ancestor of node <i>n</i>
+         *
+         * @param n a node
+         * @return true if this node is an ancestor of <i>n</i>
+         */
+        boolean isAncestor(Node n) {
+            return getPath(this).equals(NodeBase.PATH_SEPARATOR_STR)
+                    || (n.getNetworkLocation() + NodeBase.PATH_SEPARATOR_STR).startsWith(getPath(this)
+                            + NodeBase.PATH_SEPARATOR_STR);
+        }
+
+        /** Judge if this node is the parent of node <i>n</i>
+         *
+         * @param n a node
+         * @return true if this node is the parent of <i>n</i>
+         */
+        boolean isParent(Node n) {
+            return n.getNetworkLocation().equals(getPath(this));
+        }
+
+        /* Return a child name of this node who is an ancestor of node <i>n</i> */
+        private String getNextAncestorName(Node n) {
+            if (!isAncestor(n)) {
+                throw new IllegalArgumentException(this + "is not an ancestor of " + n);
+            }
+            String name = n.getNetworkLocation().substring(getPath(this).length());
+            if (name.charAt(0) == PATH_SEPARATOR) {
+                name = name.substring(1);
+            }
+            int index = name.indexOf(PATH_SEPARATOR);
+            if (index != -1)
+                name = name.substring(0, index);
+            return name;
+        }
+
+        /** Add node <i>n</i> to the subtree of this node
+         * @param n node to be added
+         * @return true if the node is added; false otherwise
+         */
+        boolean add(Node n) {
+            if (!isAncestor(n))
+                throw new IllegalArgumentException(n.getName() + ", which is located at " + n.getNetworkLocation()
+                        + ", is not a decendent of " + getPath(this));
+            if (isParent(n)) {
+                // this node is the parent of n; add n directly
+                n.setParent(this);
+                n.setLevel(this.level + 1);
+                for (int i = 0; i < children.size(); i++) {
+                    if (children.get(i).getName().equals(n.getName())) {
+                        children.set(i, n);
+                        return false;
+                    }
+                }
+                children.add(n);
+                numOfLeaves++;
+                return true;
+            } else {
+                // find the next ancestor node
+                String parentName = getNextAncestorName(n);
+                InnerNode parentNode = null;
+                for (int i = 0; i < children.size(); i++) {
+                    if (children.get(i).getName().equals(parentName)) {
+                        parentNode = (InnerNode) children.get(i);
+                        break;
+                    }
+                }
+                if (parentNode == null) {
+                    // create a new InnerNode
+                    parentNode = createParentNode(parentName);
+                    children.add(parentNode);
+                }
+                // add n to the subtree of the next ancestor node
+                if (parentNode.add(n)) {
+                    numOfLeaves++;
+                    return true;
+                } else {
+                    return false;
+                }
+            }
+        }
+
+        /**
+         * Creates a parent node to be added to the list of children.
+         * Creates a node using the InnerNode four argument constructor specifying
+         * the name, location, parent, and level of this node.
+         *
+         * <p>To be overridden in subclasses for specific InnerNode implementations,
+         * as alternative to overriding the full {@link #add(Node)} method.
+         *
+         * @param parentName The name of the parent node
+         * @return A new inner node
+         * @see InnerNode#InnerNode(String, String, InnerNode, int)
+         */
+        protected InnerNode createParentNode(String parentName) {
+            return new InnerNode(parentName, getPath(this), this, this.getLevel() + 1);
+        }
+
+        /** Remove node <i>n</i> from the subtree of this node
+         * @param n node to be deleted
+         * @return true if the node is deleted; false otherwise
+         */
+        boolean remove(Node n) {
+            String parent = n.getNetworkLocation();
+            String currentPath = getPath(this);
+            if (!isAncestor(n))
+                throw new IllegalArgumentException(n.getName() + ", which is located at " + parent
+                        + ", is not a descendent of " + currentPath);
+            if (isParent(n)) {
+                // this node is the parent of n; remove n directly
+                for (int i = 0; i < children.size(); i++) {
+                    if (children.get(i).getName().equals(n.getName())) {
+                        children.remove(i);
+                        numOfLeaves--;
+                        n.setParent(null);
+                        return true;
+                    }
+                }
+                return false;
+            } else {
+                // find the next ancestor node: the parent node
+                String parentName = getNextAncestorName(n);
+                InnerNode parentNode = null;
+                int i;
+                for (i = 0; i < children.size(); i++) {
+                    if (children.get(i).getName().equals(parentName)) {
+                        parentNode = (InnerNode) children.get(i);
+                        break;
+                    }
+                }
+                if (parentNode == null) {
+                    return false;
+                }
+                // remove n from the parent node
+                boolean isRemoved = parentNode.remove(n);
+                // if the parent node has no children, remove the parent node too
+                if (isRemoved) {
+                    if (parentNode.getNumOfChildren() == 0) {
+                        children.remove(i);
+                    }
+                    numOfLeaves--;
+                }
+                return isRemoved;
+            }
+        } // end of remove
+
+        /** Given a node's string representation, return a reference to the node
+         * @param loc string location of the form /rack/node
+         * @return null if the node is not found or the childnode is there but
+         * not an instance of {@link InnerNode}
+         */
+        private Node getLoc(String loc) {
+            if (loc == null || loc.length() == 0)
+                return this;
+
+            String[] path = loc.split(PATH_SEPARATOR_STR, 2);
+            Node childnode = null;
+            for (int i = 0; i < children.size(); i++) {
+                if (children.get(i).getName().equals(path[0])) {
+                    childnode = children.get(i);
+                }
+            }
+            if (childnode == null)
+                return null; // non-existing node
+            if (path.length == 1)
+                return childnode;
+            if (childnode instanceof InnerNode) {
+                return ((InnerNode) childnode).getLoc(path[1]);
+            } else {
+                return null;
+            }
+        }
+
+        /** get <i>leafIndex</i> leaf of this subtree
+         * if it is not in the <i>excludedNode</i>
+         *
+         * @param leafIndex an indexed leaf of the node
+         * @param excludedNode an excluded node (can be null)
+         * @return
+         */
+        Node getLeaf(int leafIndex, Node excludedNode) {
+            int count = 0;
+            // check if the excluded node a leaf
+            boolean isLeaf = excludedNode == null || !(excludedNode instanceof InnerNode);
+            // calculate the total number of excluded leaf nodes
+            int numOfExcludedLeaves = isLeaf ? 1 : ((InnerNode) excludedNode).getNumOfLeaves();
+            if (isLeafParent()) { // children are leaves
+                if (isLeaf) { // excluded node is a leaf node
+                    int excludedIndex = children.indexOf(excludedNode);
+                    if (excludedIndex != -1 && leafIndex >= 0) {
+                        // excluded node is one of the children so adjust the leaf index
+                        leafIndex = leafIndex >= excludedIndex ? leafIndex + 1 : leafIndex;
+                    }
+                }
+                // range check
+                if (leafIndex < 0 || leafIndex >= this.getNumOfChildren()) {
+                    return null;
+                }
+                return children.get(leafIndex);
+            } else {
+                for (int i = 0; i < children.size(); i++) {
+                    InnerNode child = (InnerNode) children.get(i);
+                    if (excludedNode == null || excludedNode != child) {
+                        // not the excludedNode
+                        int numOfLeaves = child.getNumOfLeaves();
+                        if (excludedNode != null && child.isAncestor(excludedNode)) {
+                            numOfLeaves -= numOfExcludedLeaves;
+                        }
+                        if (count + numOfLeaves > leafIndex) {
+                            // the leaf is in the child subtree
+                            return child.getLeaf(leafIndex - count, excludedNode);
+                        } else {
+                            // go to the next child
+                            count = count + numOfLeaves;
+                        }
+                    } else { // it is the excluededNode
+                        // skip it and set the excludedNode to be null
+                        excludedNode = null;
+                    }
+                }
+                return null;
+            }
+        }
+
+        protected boolean isLeafParent() {
+            return isRack();
+        }
+
+        /**
+          * Determine if children a leaves, default implementation calls {@link #isRack()}
+          * <p>To be overridden in subclasses for specific InnerNode implementations,
+          * as alternative to overriding the full {@link #getLeaf(int, Node)} method.
+          *
+          * @return true if children are leaves, false otherwise
+          */
+        protected boolean areChildrenLeaves() {
+            return isRack();
+        }
+
+        /**
+         * Get number of leaves.
+         */
+        int getNumOfLeaves() {
+            return numOfLeaves;
+        }
+    } // end of InnerNode
+
+    /**
+     * the root cluster map
+     */
+    InnerNode clusterMap;
+    /** Depth of all leaf nodes */
+    private int depthOfAllLeaves = -1;
+    /** rack counter */
+    protected int numOfRacks = 0;
+    /** the lock used to manage access */
+    protected ReadWriteLock netlock = new ReentrantReadWriteLock();
+
+    public NetworkTopology() {
+        clusterMap = new InnerNode(InnerNode.ROOT);
+    }
+
+    /** Add a leaf node
+     * Update node counter & rack counter if necessary
+     * @param node node to be added; can be null
+     * @exception IllegalArgumentException if add a node to a leave
+                                           or node to be added is not a leaf
+     */
+    public void add(Node node) {
+        if (node == null)
+            return;
+        String oldTopoStr = this.toString();
+        if (node instanceof InnerNode) {
+            throw new IllegalArgumentException("Not allow to add an inner node: " + NodeBase.getPath(node));
+        }
+        int newDepth = NodeBase.locationToDepth(node.getNetworkLocation()) + 1;
+        netlock.writeLock().lock();
+        try {
+            if ((depthOfAllLeaves != -1) && (depthOfAllLeaves != newDepth)) {
+                LOG.error("Error: can't add leaf node at depth " + newDepth + " to topology:\n" + oldTopoStr);
+                throw new InvalidTopologyException("Invalid network topology. "
+                        + "You cannot have a rack and a non-rack node at the same level of the network topology.");
+            }
+            Node rack = getNodeForNetworkLocation(node);
+            if (rack != null && !(rack instanceof InnerNode)) {
+                throw new IllegalArgumentException("Unexpected data node " + node.toString()
+                        + " at an illegal network location");
+            }
+            if (clusterMap.add(node)) {
+                LOG.info("Adding a new node: " + NodeBase.getPath(node));
+                if (rack == null) {
+                    numOfRacks++;
+                }
+                if (!(node instanceof InnerNode)) {
+                    if (depthOfAllLeaves == -1) {
+                        depthOfAllLeaves = node.getLevel();
+                    }
+                }
+            }
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("NetworkTopology became:\n" + this.toString());
+            }
+        } finally {
+            netlock.writeLock().unlock();
+        }
+    }
+
+    /**
+     * Return a reference to the node given its string representation.
+     * Default implementation delegates to {@link #getNode(String)}.
+     *
+     * <p>To be overridden in subclasses for specific NetworkTopology
+     * implementations, as alternative to overriding the full {@link #add(Node)}
+     *  method.
+     *
+     * @param node The string representation of this node's network location is
+     * used to retrieve a Node object.
+     * @return a reference to the node; null if the node is not in the tree
+     *
+     * @see #add(Node)
+     * @see #getNode(String)
+     */
+    protected Node getNodeForNetworkLocation(Node node) {
+        return getNode(node.getNetworkLocation());
+    }
+
+    /**
+     * Given a string representation of a rack, return its children
+     * @param loc a path-like string representation of a rack
+     * @return a newly allocated list with all the node's children
+     */
+    public List<Node> getDatanodesInRack(String loc) {
+        netlock.readLock().lock();
+        try {
+            loc = NodeBase.normalize(loc);
+            if (!NodeBase.ROOT.equals(loc)) {
+                loc = loc.substring(1);
+            }
+            InnerNode rack = (InnerNode) clusterMap.getLoc(loc);
+            if (rack == null) {
+                return null;
+            }
+            return new ArrayList<Node>(rack.getChildren());
+        } finally {
+            netlock.readLock().unlock();
+        }
+    }
+
+    /** Remove a node
+     * Update node counter and rack counter if necessary
+     * @param node node to be removed; can be null
+     */
+    public void remove(Node node) {
+        if (node == null)
+            return;
+        if (node instanceof InnerNode) {
+            throw new IllegalArgumentException("Not allow to remove an inner node: " + NodeBase.getPath(node));
+        }
+        LOG.info("Removing a node: " + NodeBase.getPath(node));
+        netlock.writeLock().lock();
+        try {
+            if (clusterMap.remove(node)) {
+                InnerNode rack = (InnerNode) getNode(node.getNetworkLocation());
+                if (rack == null) {
+                    numOfRacks--;
+                }
+            }
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("NetworkTopology became:\n" + this.toString());
+            }
+        } finally {
+            netlock.writeLock().unlock();
+        }
+    }
+
+    /** Check if the tree contains node <i>node</i>
+     *
+     * @param node a node
+     * @return true if <i>node</i> is already in the tree; false otherwise
+     */
+    public boolean contains(Node node) {
+        if (node == null)
+            return false;
+        netlock.readLock().lock();
+        try {
+            Node parent = node.getParent();
+            for (int level = node.getLevel(); parent != null && level > 0; parent = parent.getParent(), level--) {
+                if (parent == clusterMap) {
+                    return true;
+                }
+            }
+        } finally {
+            netlock.readLock().unlock();
+        }
+        return false;
+    }
+
+    /** Given a string representation of a node, return its reference
+     *
+     * @param loc
+     *          a path-like string representation of a node
+     * @return a reference to the node; null if the node is not in the tree
+     */
+    public Node getNode(String loc) {
+        netlock.readLock().lock();
+        try {
+            loc = NodeBase.normalize(loc);
+            if (!NodeBase.ROOT.equals(loc))
+                loc = loc.substring(1);
+            return clusterMap.getLoc(loc);
+        } finally {
+            netlock.readLock().unlock();
+        }
+    }
+
+    /** Given a string representation of a rack for a specific network
+     *  location
+     *
+     * To be overridden in subclasses for specific NetworkTopology
+     * implementations, as alternative to overriding the full
+     * {@link #getRack(String)} method.
+     * @param loc
+     *          a path-like string representation of a network location
+     * @return a rack string
+     */
+    public String getRack(String loc) {
+        return loc;
+    }
+
+    /** @return the total number of racks */
+    public int getNumOfRacks() {
+        netlock.readLock().lock();
+        try {
+            return numOfRacks;
+        } finally {
+            netlock.readLock().unlock();
+        }
+    }
+
+    /** @return the total number of leaf nodes */
+    public int getNumOfLeaves() {
+        netlock.readLock().lock();
+        try {
+            return clusterMap.getNumOfLeaves();
+        } finally {
+            netlock.readLock().unlock();
+        }
+    }
+
+    /** Return the distance between two nodes
+     * It is assumed that the distance from one node to its parent is 1
+     * The distance between two nodes is calculated by summing up their distances
+     * to their closest common ancestor.
+     * @param node1 one node
+     * @param node2 another node
+     * @return the distance between node1 and node2 which is zero if they are the same
+     *  or {@link Integer#MAX_VALUE} if node1 or node2 do not belong to the cluster
+     */
+    public int getDistance(Node node1, Node node2) {
+        if (node1 == node2) {
+            return 0;
+        }
+        Node n1 = node1, n2 = node2;
+        int dis = 0;
+        netlock.readLock().lock();
+        try {
+            int level1 = node1.getLevel(), level2 = node2.getLevel();
+            while (n1 != null && level1 > level2) {
+                n1 = n1.getParent();
+                level1--;
+                dis++;
+            }
+            while (n2 != null && level2 > level1) {
+                n2 = n2.getParent();
+                level2--;
+                dis++;
+            }
+            while (n1 != null && n2 != null && n1.getParent() != n2.getParent()) {
+                n1 = n1.getParent();
+                n2 = n2.getParent();
+                dis += 2;
+            }
+        } finally {
+            netlock.readLock().unlock();
+        }
+        if (n1 == null) {
+            LOG.warn("The cluster does not contain node: " + NodeBase.getPath(node1));
+            return Integer.MAX_VALUE;
+        }
+        if (n2 == null) {
+            LOG.warn("The cluster does not contain node: " + NodeBase.getPath(node2));
+            return Integer.MAX_VALUE;
+        }
+        return dis + 2;
+    }
+
+    /** Check if two nodes are on the same rack
+     * @param node1 one node (can be null)
+     * @param node2 another node (can be null)
+     * @return true if node1 and node2 are on the same rack; false otherwise
+     * @exception IllegalArgumentException when either node1 or node2 is null, or
+     * node1 or node2 do not belong to the cluster
+     */
+    public boolean isOnSameRack(Node node1, Node node2) {
+        if (node1 == null || node2 == null) {
+            return false;
+        }
+
+        netlock.readLock().lock();
+        try {
+            return isSameParents(node1, node2);
+        } finally {
+            netlock.readLock().unlock();
+        }
+    }
+
+    /**
+     * Check if network topology is aware of NodeGroup
+     */
+    public boolean isNodeGroupAware() {
+        return false;
+    }
+
+    /**
+     * Return false directly as not aware of NodeGroup, to be override in sub-class
+     */
+    public boolean isOnSameNodeGroup(Node node1, Node node2) {
+        return false;
+    }
+
+    /**
+     * Compare the parents of each node for equality
+     *
+     * <p>To be overridden in subclasses for specific NetworkTopology
+     * implementations, as alternative to overriding the full
+     * {@link #isOnSameRack(Node, Node)} method.
+     *
+     * @param node1 the first node to compare
+     * @param node2 the second node to compare
+     * @return true if their parents are equal, false otherwise
+     *
+     * @see #isOnSameRack(Node, Node)
+     */
+    protected boolean isSameParents(Node node1, Node node2) {
+        return node1.getParent() == node2.getParent();
+    }
+
+    final protected static Random r = new Random();
+
+    /** randomly choose one node from <i>scope</i>
+     * if scope starts with ~, choose one from the all nodes except for the
+     * ones in <i>scope</i>; otherwise, choose one from <i>scope</i>
+     * @param scope range of nodes from which a node will be chosen
+     * @return the chosen node
+     */
+    public Node chooseRandom(String scope) {
+        netlock.readLock().lock();
+        try {
+            if (scope.startsWith("~")) {
+                return chooseRandom(NodeBase.ROOT, scope.substring(1));
+            } else {
+                return chooseRandom(scope, null);
+            }
+        } finally {
+            netlock.readLock().unlock();
+        }
+    }
+
+    private Node chooseRandom(String scope, String excludedScope) {
+        if (excludedScope != null) {
+            if (scope.startsWith(excludedScope)) {
+                return null;
+            }
+            if (!excludedScope.startsWith(scope)) {
+                excludedScope = null;
+            }
+        }
+        Node node = getNode(scope);
+        if (!(node instanceof InnerNode)) {
+            return node;
+        }
+        InnerNode innerNode = (InnerNode) node;
+        int numOfDatanodes = innerNode.getNumOfLeaves();
+        if (excludedScope == null) {
+            node = null;
+        } else {
+            node = getNode(excludedScope);
+            if (!(node instanceof InnerNode)) {
+                numOfDatanodes -= 1;
+            } else {
+                numOfDatanodes -= ((InnerNode) node).getNumOfLeaves();
+            }
+        }
+        int leaveIndex = r.nextInt(numOfDatanodes);
+        return innerNode.getLeaf(leaveIndex, node);
+    }
+
+    /** return leaves in <i>scope</i>
+     * @param scope a path string
+     * @return leaves nodes under specific scope
+     */
+    private Set<Node> doGetLeaves(String scope) {
+        Node node = getNode(scope);
+        Set<Node> leafNodes = new HashSet<Node>();
+        if (!(node instanceof InnerNode)) {
+            leafNodes.add(node);
+        } else {
+            InnerNode innerNode = (InnerNode) node;
+            for (int i = 0; i < innerNode.getNumOfLeaves(); i++) {
+                leafNodes.add(innerNode.getLeaf(i, null));
+            }
+        }
+        return leafNodes;
+    }
+
+    public Set<Node> getLeaves(String scope) {
+        netlock.readLock().lock();
+        try {
+            if (scope.startsWith("~")) {
+                Set<Node> allNodes = doGetLeaves(NodeBase.ROOT);
+                Set<Node> excludeNodes = doGetLeaves(scope.substring(1));
+                allNodes.removeAll(excludeNodes);
+                return allNodes;
+            } else {
+                return doGetLeaves(scope);
+            }
+        } finally {
+            netlock.readLock().unlock();
+        }
+    }
+
+    /** return the number of leaves in <i>scope</i> but not in <i>excludedNodes</i>
+     * if scope starts with ~, return the number of nodes that are not
+     * in <i>scope</i> and <i>excludedNodes</i>;
+     * @param scope a path string that may start with ~
+     * @param excludedNodes a list of nodes
+     * @return number of available nodes
+     */
+    public int countNumOfAvailableNodes(String scope, Collection<Node> excludedNodes) {
+        boolean isExcluded = false;
+        if (scope.startsWith("~")) {
+            isExcluded = true;
+            scope = scope.substring(1);
+        }
+        scope = NodeBase.normalize(scope);
+        int count = 0; // the number of nodes in both scope & excludedNodes
+        netlock.readLock().lock();
+        try {
+            for (Node node : excludedNodes) {
+                if ((NodeBase.getPath(node) + NodeBase.PATH_SEPARATOR_STR).startsWith(scope
+                        + NodeBase.PATH_SEPARATOR_STR)) {
+                    count++;
+                }
+            }
+            Node n = getNode(scope);
+            int scopeNodeCount = 1;
+            if (n instanceof InnerNode) {
+                scopeNodeCount = ((InnerNode) n).getNumOfLeaves();
+            }
+            if (isExcluded) {
+                return clusterMap.getNumOfLeaves() - scopeNodeCount - excludedNodes.size() + count;
+            } else {
+                return scopeNodeCount - count;
+            }
+        } finally {
+            netlock.readLock().unlock();
+        }
+    }
+
+    /** convert a network tree to a string */
+    @Override
+    public String toString() {
+        // print the number of racks
+        StringBuilder tree = new StringBuilder();
+        tree.append("Number of racks: ");
+        tree.append(numOfRacks);
+        tree.append("\n");
+        // print the number of leaves
+        int numOfLeaves = getNumOfLeaves();
+        tree.append("Expected number of leaves:");
+        tree.append(numOfLeaves);
+        tree.append("\n");
+        // print nodes
+        for (int i = 0; i < numOfLeaves; i++) {
+            tree.append(NodeBase.getPath(clusterMap.getLeaf(i, null)));
+            tree.append("\n");
+        }
+        return tree.toString();
+    }
+
+    /**
+     * Divide networklocation string into two parts by last separator, and get
+     * the first part here.
+     *
+     * @param networkLocation
+     * @return
+     */
+    public static String getFirstHalf(String networkLocation) {
+        int index = networkLocation.lastIndexOf(NodeBase.PATH_SEPARATOR_STR);
+        return networkLocation.substring(0, index);
+    }
+
+    /**
+     * Divide networklocation string into two parts by last separator, and get
+     * the second part here.
+     *
+     * @param networkLocation
+     * @return
+     */
+    public static String getLastHalf(String networkLocation) {
+        int index = networkLocation.lastIndexOf(NodeBase.PATH_SEPARATOR_STR);
+        return networkLocation.substring(index);
+    }
+
+    /** swap two array items */
+    static protected void swap(Node[] nodes, int i, int j) {
+        Node tempNode;
+        tempNode = nodes[j];
+        nodes[j] = nodes[i];
+        nodes[i] = tempNode;
+    }
+
+    /** Sort nodes array by their distances to <i>reader</i>
+     * It linearly scans the array, if a local node is found, swap it with
+     * the first element of the array.
+     * If a local rack node is found, swap it with the first element following
+     * the local node.
+     * If neither local node or local rack node is found, put a random replica
+     * location at position 0.
+     * It leaves the rest nodes untouched.
+     * @param reader the node that wishes to read a block from one of the nodes
+     * @param nodes the list of nodes containing data for the reader
+     */
+    public void pseudoSortByDistance(Node reader, Node[] nodes) {
+        int tempIndex = 0;
+        int localRackNode = -1;
+        if (reader != null) {
+            //scan the array to find the local node & local rack node
+            for (int i = 0; i < nodes.length; i++) {
+                if (tempIndex == 0 && reader == nodes[i]) { //local node
+                    //swap the local node and the node at position 0
+                    if (i != 0) {
+                        swap(nodes, tempIndex, i);
+                    }
+                    tempIndex = 1;
+                    if (localRackNode != -1) {
+                        if (localRackNode == 0) {
+                            localRackNode = i;
+                        }
+                        break;
+                    }
+                } else if (localRackNode == -1 && isOnSameRack(reader, nodes[i])) {
+                    //local rack
+                    localRackNode = i;
+                    if (tempIndex != 0)
+                        break;
+                }
+            }
+
+            // swap the local rack node and the node at position tempIndex
+            if (localRackNode != -1 && localRackNode != tempIndex) {
+                swap(nodes, tempIndex, localRackNode);
+                tempIndex++;
+            }
+        }
+
+        // put a random node at position 0 if it is not a local/local-rack node
+        if (tempIndex == 0 && localRackNode == -1 && nodes.length != 0) {
+            swap(nodes, 0, r.nextInt(nodes.length));
+        }
+    }
+
+}

Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/Node.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/Node.java?rev=1492274&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/Node.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/Node.java Wed Jun 12 16:41:47 2013
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.net;
+
+import com.google.common.annotations.Beta;
+
+/** The interface defines a node in a network topology.
+ * A node may be a leave representing a data node or an inner
+ * node representing a datacenter or rack.
+ * Each data has a name and its location in the network is
+ * decided by a string with syntax similar to a file name.
+ * For example, a data node's name is hostname:port# and if it's located at
+ * rack "orange" in datacenter "dog", the string representation of its
+ * network location is /dog/orange
+ */
+@Beta
+public interface Node {
+    /** @return the string representation of this node's network location */
+    public String getNetworkLocation();
+
+    /** Set this node's network location
+     * @param location the location
+     */
+    public void setNetworkLocation(String location);
+
+    /** @return this node's name */
+    public String getName();
+
+    /** @return this node's parent */
+    public Node getParent();
+
+    /** Set this node's parent
+     * @param parent the parent
+     */
+    public void setParent(Node parent);
+
+    /** @return this node's level in the tree.
+     * E.g. the root of a tree returns 0 and its children return 1
+     */
+    public int getLevel();
+
+    /** Set this node's level in the tree
+     * @param i the level
+     */
+    public void setLevel(int i);
+}

Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/NodeBase.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/NodeBase.java?rev=1492274&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/NodeBase.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/NodeBase.java Wed Jun 12 16:41:47 2013
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.net;
+
+/** A base class that implements interface Node
+ *
+ */
+public class NodeBase implements Node {
+    /** Path separator {@value} */
+    public final static char PATH_SEPARATOR = '/';
+    /** Path separator as a string {@value} */
+    public final static String PATH_SEPARATOR_STR = "/";
+    /** string representation of root {@value} */
+    public final static String ROOT = "";
+
+    protected String name; //host:port#
+    protected String location; //string representation of this node's location
+    protected int level; //which level of the tree the node resides
+    protected Node parent; //its parent
+
+    /** Default constructor */
+    public NodeBase() {
+    }
+
+    /** Construct a node from its path
+     * @param path
+     *   a concatenation of this node's location, the path seperator, and its name
+     */
+    public NodeBase(String path) {
+        path = normalize(path);
+        int index = path.lastIndexOf(PATH_SEPARATOR);
+        if (index == -1) {
+            set(ROOT, path);
+        } else {
+            set(path.substring(index + 1), path.substring(0, index));
+        }
+    }
+
+    /** Construct a node from its name and its location
+     * @param name this node's name (can be null, must not contain {@link #PATH_SEPARATOR})
+     * @param location this node's location
+     */
+    public NodeBase(String name, String location) {
+        set(name, normalize(location));
+    }
+
+    /** Construct a node from its name and its location
+     * @param name this node's name (can be null, must not contain {@link #PATH_SEPARATOR})
+     * @param location this node's location
+     * @param parent this node's parent node
+     * @param level this node's level in the tree
+     */
+    public NodeBase(String name, String location, Node parent, int level) {
+        set(name, normalize(location));
+        this.parent = parent;
+        this.level = level;
+    }
+
+    /**
+     * set this node's name and location
+     * @param name the (nullable) name -which cannot contain the {@link #PATH_SEPARATOR}
+     * @param location the location
+     */
+    private void set(String name, String location) {
+        if (name != null && name.contains(PATH_SEPARATOR_STR))
+            throw new IllegalArgumentException("Network location name contains /: " + name);
+        this.name = (name == null) ? "" : name;
+        this.location = location;
+    }
+
+    /** @return this node's name */
+    @Override
+    public String getName() {
+        return name;
+    }
+
+    /** @return this node's network location */
+    @Override
+    public String getNetworkLocation() {
+        return location;
+    }
+
+    /** Set this node's network location
+     * @param location the location
+     */
+    @Override
+    public void setNetworkLocation(String location) {
+        this.location = location;
+    }
+
+    /**
+     * Get the path of a node
+     * @param node a non-null node
+     * @return the path of a node
+     */
+    public static String getPath(Node node) {
+        return node.getNetworkLocation() + PATH_SEPARATOR_STR + node.getName();
+    }
+
+    /** @return this node's path as its string representation */
+    @Override
+    public String toString() {
+        return getPath(this);
+    }
+
+    /** Normalize a path by stripping off any trailing {@link #PATH_SEPARATOR}
+     * @param path path to normalize.
+     * @return the normalised path
+     * If <i>path</i>is null or empty {@link #ROOT} is returned
+     * @throws IllegalArgumentException if the first character of a non empty path
+     * is not {@link #PATH_SEPARATOR}
+     */
+    public static String normalize(String path) {
+        if (path == null || path.length() == 0)
+            return ROOT;
+
+        if (path.charAt(0) != PATH_SEPARATOR) {
+            throw new IllegalArgumentException("Network Location path does not start with " + PATH_SEPARATOR_STR + ": "
+                    + path);
+        }
+
+        int len = path.length();
+        if (path.charAt(len - 1) == PATH_SEPARATOR) {
+            return path.substring(0, len - 1);
+        }
+        return path;
+    }
+
+    /** @return this node's parent */
+    @Override
+    public Node getParent() {
+        return parent;
+    }
+
+    /** Set this node's parent
+     * @param parent the parent
+     */
+    @Override
+    public void setParent(Node parent) {
+        this.parent = parent;
+    }
+
+    /** @return this node's level in the tree.
+     * E.g. the root of a tree returns 0 and its children return 1
+     */
+    @Override
+    public int getLevel() {
+        return level;
+    }
+
+    /** Set this node's level in the tree
+     * @param level the level
+     */
+    @Override
+    public void setLevel(int level) {
+        this.level = level;
+    }
+
+    public static int locationToDepth(String location) {
+        String normalizedLocation = normalize(location);
+        int length = normalizedLocation.length();
+        int depth = 0;
+        for (int i = 0; i < length; i++) {
+            if (normalizedLocation.charAt(i) == PATH_SEPARATOR) {
+                depth++;
+            }
+        }
+        return depth;
+    }
+}

Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/ScriptBasedMapping.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/ScriptBasedMapping.java?rev=1492274&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/ScriptBasedMapping.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/ScriptBasedMapping.java Wed Jun 12 16:41:47 2013
@@ -0,0 +1,264 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.net;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.StringTokenizer;
+
+import org.apache.bookkeeper.util.Shell.ShellCommandExecutor;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * This class implements the {@link DNSToSwitchMapping} interface using a
+ * script configured via the
+ * {@link CommonConfigurationKeys#NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY} option.
+ * <p/>
+ * It contains a static class <code>RawScriptBasedMapping</code> that performs
+ * the work: reading the configuration parameters, executing any defined
+ * script, handling errors and such like. The outer
+ * class extends {@link CachedDNSToSwitchMapping} to cache the delegated
+ * queries.
+ * <p/>
+ * This DNS mapper's {@link #isSingleSwitch()} predicate returns
+ * true if and only if a script is defined.
+ */
+public final class ScriptBasedMapping extends CachedDNSToSwitchMapping {
+
+    /**
+     * Minimum number of arguments: {@value}
+     */
+    static final int MIN_ALLOWABLE_ARGS = 1;
+
+    /**
+     * Default number of arguments: {@value}
+     */
+    static final int DEFAULT_ARG_COUNT = CommonConfigurationKeys.NET_TOPOLOGY_SCRIPT_NUMBER_ARGS_DEFAULT;
+
+    /**
+     * key to the script filename {@value}
+     */
+    static final String SCRIPT_FILENAME_KEY = CommonConfigurationKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY;
+    /**
+     * key to the argument count that the script supports
+     * {@value}
+     */
+    static final String SCRIPT_ARG_COUNT_KEY = CommonConfigurationKeys.NET_TOPOLOGY_SCRIPT_NUMBER_ARGS_KEY;
+    /**
+     * Text used in the {@link #toString()} method if there is no string
+     * {@value}
+     */
+    public static final String NO_SCRIPT = "no script";
+
+    /**
+     * Create an instance with the default configuration.
+     * </p>
+     * Calling {@link #setConf(Configuration)} will trigger a
+     * re-evaluation of the configuration settings and so be used to
+     * set up the mapping script.
+     *
+     */
+    public ScriptBasedMapping() {
+        super(new RawScriptBasedMapping());
+    }
+
+    /**
+     * Create an instance from the given configuration
+     * @param conf configuration
+     */
+    public ScriptBasedMapping(Configuration conf) {
+        this();
+        setConf(conf);
+    }
+
+    /**
+     * Get the cached mapping and convert it to its real type
+     * @return the inner raw script mapping.
+     */
+    private RawScriptBasedMapping getRawMapping() {
+        return (RawScriptBasedMapping) rawMapping;
+    }
+
+    @Override
+    public Configuration getConf() {
+        return getRawMapping().getConf();
+    }
+
+    @Override
+    public String toString() {
+        return "script-based mapping with " + getRawMapping().toString();
+    }
+
+    /**
+     * {@inheritDoc}
+     * <p/>
+     * This will get called in the superclass constructor, so a check is needed
+     * to ensure that the raw mapping is defined before trying to relaying a null
+     * configuration.
+     * @param conf
+     */
+    @Override
+    public void setConf(Configuration conf) {
+        super.setConf(conf);
+        getRawMapping().setConf(conf);
+    }
+
+    /**
+     * This is the uncached script mapping that is fed into the cache managed
+     * by the superclass {@link CachedDNSToSwitchMapping}
+     */
+    private static final class RawScriptBasedMapping extends AbstractDNSToSwitchMapping {
+        private String scriptName;
+        private int maxArgs; //max hostnames per call of the script
+        private static final Log LOG = LogFactory.getLog(ScriptBasedMapping.class);
+
+        /**
+         * Set the configuration and extract the configuration parameters of interest
+         * @param conf the new configuration
+         */
+        @Override
+        public void setConf(Configuration conf) {
+            super.setConf(conf);
+            if (conf != null) {
+                scriptName = conf.getString(SCRIPT_FILENAME_KEY);
+                maxArgs = conf.getInt(SCRIPT_ARG_COUNT_KEY, DEFAULT_ARG_COUNT);
+            } else {
+                scriptName = null;
+                maxArgs = 0;
+            }
+        }
+
+        /**
+         * Constructor. The mapping is not ready to use until
+         * {@link #setConf(Configuration)} has been called
+         */
+        public RawScriptBasedMapping() {
+        }
+
+        @Override
+        public List<String> resolve(List<String> names) {
+            List<String> m = new ArrayList<String>(names.size());
+
+            if (names.isEmpty()) {
+                return m;
+            }
+
+            if (scriptName == null) {
+                for (int i = 0; i < names.size(); i++) {
+                    m.add(NetworkTopology.DEFAULT_RACK);
+                }
+                return m;
+            }
+
+            String output = runResolveCommand(names);
+            if (output != null) {
+                StringTokenizer allSwitchInfo = new StringTokenizer(output);
+                while (allSwitchInfo.hasMoreTokens()) {
+                    String switchInfo = allSwitchInfo.nextToken();
+                    m.add(switchInfo);
+                }
+
+                if (m.size() != names.size()) {
+                    // invalid number of entries returned by the script
+                    LOG.error("Script " + scriptName + " returned " + Integer.toString(m.size()) + " values when "
+                            + Integer.toString(names.size()) + " were expected.");
+                    return null;
+                }
+            } else {
+                // an error occurred. return null to signify this.
+                // (exn was already logged in runResolveCommand)
+                return null;
+            }
+
+            return m;
+        }
+
+        /**
+         * Build and execute the resolution command. The command is
+         * executed in the directory specified by the system property
+         * "user.dir" if set; otherwise the current working directory is used
+         * @param args a list of arguments
+         * @return null if the number of arguments is out of range,
+         * or the output of the command.
+         */
+        private String runResolveCommand(List<String> args) {
+            int loopCount = 0;
+            if (args.size() == 0) {
+                return null;
+            }
+            StringBuilder allOutput = new StringBuilder();
+            int numProcessed = 0;
+            if (maxArgs < MIN_ALLOWABLE_ARGS) {
+                LOG.warn("Invalid value " + Integer.toString(maxArgs) + " for " + SCRIPT_ARG_COUNT_KEY
+                        + "; must be >= " + Integer.toString(MIN_ALLOWABLE_ARGS));
+                return null;
+            }
+
+            while (numProcessed != args.size()) {
+                int start = maxArgs * loopCount;
+                List<String> cmdList = new ArrayList<String>();
+                cmdList.add(scriptName);
+                for (numProcessed = start;
+                     numProcessed < (start + maxArgs) && numProcessed < args.size();
+                     numProcessed++) {
+                    cmdList.add(args.get(numProcessed));
+                }
+                File dir = null;
+                String userDir;
+                if ((userDir = System.getProperty("user.dir")) != null) {
+                    dir = new File(userDir);
+                }
+                ShellCommandExecutor s = new ShellCommandExecutor(cmdList.toArray(new String[cmdList.size()]), dir);
+                try {
+                    s.execute();
+                    allOutput.append(s.getOutput()).append(" ");
+                } catch (Exception e) {
+                    LOG.warn("Exception running " + s, e);
+                    return null;
+                }
+                loopCount++;
+            }
+            return allOutput.toString();
+        }
+
+        /**
+         * Declare that the mapper is single-switched if a script was not named
+         * in the configuration.
+         * @return true iff there is no script
+         */
+        @Override
+        public boolean isSingleSwitch() {
+            return scriptName == null;
+        }
+
+        @Override
+        public String toString() {
+            return scriptName != null ? ("script " + scriptName) : NO_SCRIPT;
+        }
+
+        @Override
+        public void reloadCachedMappings() {
+            // Nothing to do here, since RawScriptBasedMapping has no cache, and
+            // does not inherit from CachedDNSToSwitchMapping
+        }
+    }
+}

Added: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/Shell.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/Shell.java?rev=1492274&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/Shell.java (added)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/Shell.java Wed Jun 12 16:41:47 2013
@@ -0,0 +1,677 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.util;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import com.google.common.base.Charsets;
+
+/**
+ * A base class for running a Unix command.
+ *
+ * <code>Shell</code> can be used to run unix commands like <code>du</code> or
+ * <code>df</code>. It also offers facilities to gate commands by
+ * time-intervals.
+ */
+abstract public class Shell {
+
+    public static final Log LOG = LogFactory.getLog(Shell.class);
+
+    private static boolean IS_JAVA7_OR_ABOVE =
+        System.getProperty("java.version").substring(0, 3).compareTo("1.7") >= 0;
+
+    public static boolean isJava7OrAbove() {
+        return IS_JAVA7_OR_ABOVE;
+    }
+
+    /** a Unix command to get the current user's name */
+    public final static String USER_NAME_COMMAND = "whoami";
+
+    /** Windows CreateProcess synchronization object */
+    public static final Object WindowsProcessLaunchLock = new Object();
+
+    /** a Unix command to get the current user's groups list */
+    public static String[] getGroupsCommand() {
+        return (WINDOWS) ? new String[] { "cmd", "/c", "groups" } : new String[] { "bash", "-c", "groups" };
+    }
+
+    /** a Unix command to get a given user's groups list */
+    public static String[] getGroupsForUserCommand(final String user) {
+        //'groups username' command return is non-consistent across different unixes
+        return (WINDOWS) ? new String[] { WINUTILS, "groups", "-F", "\"" + user + "\"" } : new String[] { "bash", "-c",
+                "id -Gn " + user };
+    }
+
+    /** a Unix command to get a given netgroup's user list */
+    public static String[] getUsersForNetgroupCommand(final String netgroup) {
+        //'groups username' command return is non-consistent across different unixes
+        return (WINDOWS) ? new String[] { "cmd", "/c", "getent netgroup " + netgroup } : new String[] { "bash", "-c",
+                "getent netgroup " + netgroup };
+    }
+
+    /** Return a command to get permission information. */
+    public static String[] getGetPermissionCommand() {
+        return (WINDOWS) ? new String[] { WINUTILS, "ls", "-F" } : new String[] { "/bin/ls", "-ld" };
+    }
+
+    /** Return a command to set permission */
+    public static String[] getSetPermissionCommand(String perm, boolean recursive) {
+        if (recursive) {
+            return (WINDOWS) ? new String[] { WINUTILS, "chmod", "-R", perm } : new String[] { "chmod", "-R", perm };
+        } else {
+            return (WINDOWS) ? new String[] { WINUTILS, "chmod", perm } : new String[] { "chmod", perm };
+        }
+    }
+
+    /**
+     * Return a command to set permission for specific file.
+     *
+     * @param perm String permission to set
+     * @param recursive boolean true to apply to all sub-directories recursively
+     * @param file String file to set
+     * @return String[] containing command and arguments
+     */
+    public static String[] getSetPermissionCommand(String perm, boolean recursive, String file) {
+        String[] baseCmd = getSetPermissionCommand(perm, recursive);
+        String[] cmdWithFile = Arrays.copyOf(baseCmd, baseCmd.length + 1);
+        cmdWithFile[cmdWithFile.length - 1] = file;
+        return cmdWithFile;
+    }
+
+    /** Return a command to set owner */
+    public static String[] getSetOwnerCommand(String owner) {
+        return (WINDOWS) ? new String[] { WINUTILS, "chown", "\"" + owner + "\"" } : new String[] { "chown", owner };
+    }
+
+    /** Return a command to create symbolic links */
+    public static String[] getSymlinkCommand(String target, String link) {
+        return WINDOWS ? new String[] { WINUTILS, "symlink", link, target } : new String[] { "ln", "-s", target, link };
+    }
+
+    /** Return a command for determining if process with specified pid is alive. */
+    public static String[] getCheckProcessIsAliveCommand(String pid) {
+        return Shell.WINDOWS ? new String[] { Shell.WINUTILS, "task", "isAlive", pid } : new String[] { "kill", "-0",
+                isSetsidAvailable ? "-" + pid : pid };
+    }
+
+    /** Return a command to send a signal to a given pid */
+    public static String[] getSignalKillCommand(int code, String pid) {
+        return Shell.WINDOWS ? new String[] { Shell.WINUTILS, "task", "kill", pid } : new String[] { "kill",
+                "-" + code, isSetsidAvailable ? "-" + pid : pid };
+    }
+
+    /**
+     * Returns a File referencing a script with the given basename, inside the
+     * given parent directory.  The file extension is inferred by platform: ".cmd"
+     * on Windows, or ".sh" otherwise.
+     *
+     * @param parent File parent directory
+     * @param basename String script file basename
+     * @return File referencing the script in the directory
+     */
+    public static File appendScriptExtension(File parent, String basename) {
+        return new File(parent, appendScriptExtension(basename));
+    }
+
+    /**
+     * Returns a script file name with the given basename.  The file extension is
+     * inferred by platform: ".cmd" on Windows, or ".sh" otherwise.
+     *
+     * @param basename String script file basename
+     * @return String script file name
+     */
+    public static String appendScriptExtension(String basename) {
+        return basename + (WINDOWS ? ".cmd" : ".sh");
+    }
+
+    /**
+     * Returns a command to run the given script.  The script interpreter is
+     * inferred by platform: cmd on Windows or bash otherwise.
+     *
+     * @param script File script to run
+     * @return String[] command to run the script
+     */
+    public static String[] getRunScriptCommand(File script) {
+        String absolutePath = script.getAbsolutePath();
+        return WINDOWS ? new String[] { "cmd", "/c", absolutePath } : new String[] { "/bin/bash", absolutePath };
+    }
+
+    /** a Unix command to set permission */
+    public static final String SET_PERMISSION_COMMAND = "chmod";
+    /** a Unix command to set owner */
+    public static final String SET_OWNER_COMMAND = "chown";
+
+    /** a Unix command to set the change user's groups list */
+    public static final String SET_GROUP_COMMAND = "chgrp";
+    /** a Unix command to create a link */
+    public static final String LINK_COMMAND = "ln";
+    /** a Unix command to get a link target */
+    public static final String READ_LINK_COMMAND = "readlink";
+
+    /**Time after which the executing script would be timedout*/
+    protected long timeOutInterval = 0L;
+    /** If or not script timed out*/
+    private AtomicBoolean timedOut;
+
+    /** Centralized logic to discover and validate the sanity of the Hadoop
+     *  home directory. Returns either NULL or a directory that exists and
+     *  was specified via either -Dhadoop.home.dir or the HADOOP_HOME ENV
+     *  variable.  This does a lot of work so it should only be called
+     *  privately for initialization once per process.
+     **/
+    private static String checkHadoopHome() {
+
+        // first check the Dflag hadoop.home.dir with JVM scope
+        String home = System.getProperty("hadoop.home.dir");
+
+        // fall back to the system/user-global env variable
+        if (home == null) {
+            home = System.getenv("HADOOP_HOME");
+        }
+
+        try {
+            // couldn't find either setting for hadoop's home directory
+            if (home == null) {
+                throw new IOException("HADOOP_HOME or hadoop.home.dir are not set.");
+            }
+
+            if (home.startsWith("\"") && home.endsWith("\"")) {
+                home = home.substring(1, home.length() - 1);
+            }
+
+            // check that the home setting is actually a directory that exists
+            File homedir = new File(home);
+            if (!homedir.isAbsolute() || !homedir.exists() || !homedir.isDirectory()) {
+                throw new IOException("Hadoop home directory " + homedir
+                        + " does not exist, is not a directory, or is not an absolute path.");
+            }
+
+            home = homedir.getCanonicalPath();
+
+        } catch (IOException ioe) {
+            LOG.error("Failed to detect a valid hadoop home directory", ioe);
+            home = null;
+        }
+
+        return home;
+    }
+
+    private static String HADOOP_HOME_DIR = checkHadoopHome();
+
+    // Public getter, throws an exception if HADOOP_HOME failed validation
+    // checks and is being referenced downstream.
+    public static final String getHadoopHome() throws IOException {
+        if (HADOOP_HOME_DIR == null) {
+            throw new IOException("Misconfigured HADOOP_HOME cannot be referenced.");
+        }
+
+        return HADOOP_HOME_DIR;
+    }
+
+    /** fully qualify the path to a binary that should be in a known hadoop
+     *  bin location. This is primarily useful for disambiguating call-outs
+     *  to executable sub-components of Hadoop to avoid clashes with other
+     *  executables that may be in the path.  Caveat:  this call doesn't
+     *  just format the path to the bin directory.  It also checks for file
+     *  existence of the composed path. The output of this call should be
+     *  cached by callers.
+     * */
+    public static final String getQualifiedBinPath(String executable) throws IOException {
+        // construct hadoop bin path to the specified executable
+        String fullExeName = HADOOP_HOME_DIR + File.separator + "bin" + File.separator + executable;
+
+        File exeFile = new File(fullExeName);
+        if (!exeFile.exists()) {
+            throw new IOException("Could not locate executable " + fullExeName + " in the Hadoop binaries.");
+        }
+
+        return exeFile.getCanonicalPath();
+    }
+
+    /** Set to true on Windows platforms */
+    public static final boolean WINDOWS /* borrowed from Path.WINDOWS */
+    = System.getProperty("os.name").startsWith("Windows");
+
+    public static final boolean LINUX = System.getProperty("os.name").startsWith("Linux");
+
+    /** a Windows utility to emulate Unix commands */
+    public static final String WINUTILS = getWinUtilsPath();
+
+    public static final String getWinUtilsPath() {
+        String winUtilsPath = null;
+
+        try {
+            if (WINDOWS) {
+                winUtilsPath = getQualifiedBinPath("winutils.exe");
+            }
+        } catch (IOException ioe) {
+            LOG.error("Failed to locate the winutils binary in the hadoop binary path", ioe);
+        }
+
+        return winUtilsPath;
+    }
+
+    public static final boolean isSetsidAvailable = isSetsidSupported();
+
+    private static boolean isSetsidSupported() {
+        if (Shell.WINDOWS) {
+            return false;
+        }
+        ShellCommandExecutor shexec = null;
+        boolean setsidSupported = true;
+        try {
+            String[] args = { "setsid", "bash", "-c", "echo $$" };
+            shexec = new ShellCommandExecutor(args);
+            shexec.execute();
+        } catch (IOException ioe) {
+            LOG.warn("setsid is not available on this machine. So not using it.");
+            setsidSupported = false;
+        } finally { // handle the exit code
+            if (null != shexec) {
+                LOG.info("setsid exited with exit code " + shexec.getExitCode());
+            }
+        }
+        return setsidSupported;
+    }
+
+    /** Token separator regex used to parse Shell tool outputs */
+    public static final String TOKEN_SEPARATOR_REGEX = WINDOWS ? "[|\n\r]" : "[ \t\n\r\f]";
+
+    private long interval; // refresh interval in msec
+    private long lastTime; // last time the command was performed
+    private Map<String, String> environment; // env for the command execution
+    private File dir;
+    private Process process; // sub process used to execute the command
+    private int exitCode;
+
+    /**If or not script finished executing*/
+    private volatile AtomicBoolean completed;
+
+    public Shell() {
+        this(0L);
+    }
+
+    /**
+     * @param interval the minimum duration to wait before re-executing the
+     *        command.
+     */
+    public Shell(long interval) {
+        this.interval = interval;
+        this.lastTime = (interval < 0) ? 0 : -interval;
+    }
+
+    /** set the environment for the command
+     * @param env Mapping of environment variables
+     */
+    protected void setEnvironment(Map<String, String> env) {
+        this.environment = env;
+    }
+
+    /** set the working directory
+     * @param dir The directory where the command would be executed
+     */
+    protected void setWorkingDirectory(File dir) {
+        this.dir = dir;
+    }
+
+    /** check to see if a command needs to be executed and execute if needed */
+    protected void run() throws IOException {
+        if (lastTime + interval > MathUtils.now())
+            return;
+        exitCode = 0; // reset for next run
+        runCommand();
+    }
+
+    /** Run a command */
+    private void runCommand() throws IOException {
+        ProcessBuilder builder = new ProcessBuilder(getExecString());
+        Timer timeOutTimer = null;
+        ShellTimeoutTimerTask timeoutTimerTask = null;
+        timedOut = new AtomicBoolean(false);
+        completed = new AtomicBoolean(false);
+
+        if (environment != null) {
+            builder.environment().putAll(this.environment);
+        }
+        if (dir != null) {
+            builder.directory(this.dir);
+        }
+
+        if (Shell.WINDOWS) {
+            synchronized (WindowsProcessLaunchLock) {
+                // To workaround the race condition issue with child processes
+                // inheriting unintended handles during process launch that can
+                // lead to hangs on reading output and error streams, we
+                // serialize process creation. More info available at:
+                // http://support.microsoft.com/kb/315939
+                process = builder.start();
+            }
+        } else {
+            process = builder.start();
+        }
+
+        if (timeOutInterval > 0) {
+            timeOutTimer = new Timer("Shell command timeout");
+            timeoutTimerTask = new ShellTimeoutTimerTask(this);
+            //One time scheduling.
+            timeOutTimer.schedule(timeoutTimerTask, timeOutInterval);
+        }
+        final BufferedReader errReader = new BufferedReader(new InputStreamReader(process.getErrorStream(),
+                Charsets.UTF_8));
+        BufferedReader inReader = new BufferedReader(new InputStreamReader(process.getInputStream(), Charsets.UTF_8));
+        final StringBuffer errMsg = new StringBuffer();
+
+        // read error and input streams as this would free up the buffers
+        // free the error stream buffer
+        Thread errThread = new Thread() {
+            @Override
+            public void run() {
+                try {
+                    String line = errReader.readLine();
+                    while ((line != null) && !isInterrupted()) {
+                        errMsg.append(line);
+                        errMsg.append(System.getProperty("line.separator"));
+                        line = errReader.readLine();
+                    }
+                } catch (IOException ioe) {
+                    LOG.warn("Error reading the error stream", ioe);
+                }
+            }
+        };
+        try {
+            errThread.start();
+        } catch (IllegalStateException ise) {
+        }
+        try {
+            parseExecResult(inReader); // parse the output
+            // clear the input stream buffer
+            String line = inReader.readLine();
+            while (line != null) {
+                line = inReader.readLine();
+            }
+            // wait for the process to finish and check the exit code
+            exitCode = process.waitFor();
+            try {
+                // make sure that the error thread exits
+                errThread.join();
+            } catch (InterruptedException ie) {
+                LOG.warn("Interrupted while reading the error stream", ie);
+            }
+            completed.set(true);
+            //the timeout thread handling
+            //taken care in finally block
+            if (exitCode != 0) {
+                throw new ExitCodeException(exitCode, errMsg.toString());
+            }
+        } catch (InterruptedException ie) {
+            throw new IOException(ie.toString());
+        } finally {
+            if (timeOutTimer != null) {
+                timeOutTimer.cancel();
+            }
+            // close the input stream
+            try {
+                inReader.close();
+            } catch (IOException ioe) {
+                LOG.warn("Error while closing the input stream", ioe);
+            }
+            if (!completed.get()) {
+                errThread.interrupt();
+            }
+            try {
+                errReader.close();
+            } catch (IOException ioe) {
+                LOG.warn("Error while closing the error stream", ioe);
+            }
+            process.destroy();
+            lastTime = MathUtils.now();
+        }
+    }
+
+    /** return an array containing the command name & its parameters */
+    protected abstract String[] getExecString();
+
+    /** Parse the execution result */
+    protected abstract void parseExecResult(BufferedReader lines) throws IOException;
+
+    /** get the current sub-process executing the given command
+     * @return process executing the command
+     */
+    public Process getProcess() {
+        return process;
+    }
+
+    /** get the exit code
+     * @return the exit code of the process
+     */
+    public int getExitCode() {
+        return exitCode;
+    }
+
+    /**
+     * This is an IOException with exit code added.
+     */
+    public static class ExitCodeException extends IOException {
+        private static final long serialVersionUID = 2241095121609500810L;
+
+        int exitCode;
+
+        public ExitCodeException(int exitCode, String message) {
+            super(message);
+            this.exitCode = exitCode;
+        }
+
+        public int getExitCode() {
+            return exitCode;
+        }
+    }
+
+    /**
+     * A simple shell command executor.
+     *
+     * <code>ShellCommandExecutor</code>should be used in cases where the output
+     * of the command needs no explicit parsing and where the command, working
+     * directory and the environment remains unchanged. The output of the command
+     * is stored as-is and is expected to be small.
+     */
+    public static class ShellCommandExecutor extends Shell {
+
+        private String[] command;
+        private StringBuffer output;
+
+        public ShellCommandExecutor(String[] execString) {
+            this(execString, null);
+        }
+
+        public ShellCommandExecutor(String[] execString, File dir) {
+            this(execString, dir, null);
+        }
+
+        public ShellCommandExecutor(String[] execString, File dir, Map<String, String> env) {
+            this(execString, dir, env, 0L);
+        }
+
+        /**
+         * Create a new instance of the ShellCommandExecutor to execute a command.
+         *
+         * @param execString The command to execute with arguments
+         * @param dir If not-null, specifies the directory which should be set
+         *            as the current working directory for the command.
+         *            If null, the current working directory is not modified.
+         * @param env If not-null, environment of the command will include the
+         *            key-value pairs specified in the map. If null, the current
+         *            environment is not modified.
+         * @param timeout Specifies the time in milliseconds, after which the
+         *                command will be killed and the status marked as timedout.
+         *                If 0, the command will not be timed out.
+         */
+        public ShellCommandExecutor(String[] execString, File dir, Map<String, String> env, long timeout) {
+            command = execString.clone();
+            if (dir != null) {
+                setWorkingDirectory(dir);
+            }
+            if (env != null) {
+                setEnvironment(env);
+            }
+            timeOutInterval = timeout;
+        }
+
+        /** Execute the shell command. */
+        public void execute() throws IOException {
+            this.run();
+        }
+
+        @Override
+        public String[] getExecString() {
+            return command.clone();
+        }
+
+        @Override
+        protected void parseExecResult(BufferedReader lines) throws IOException {
+            output = new StringBuffer();
+            char[] buf = new char[512];
+            int nRead;
+            while ((nRead = lines.read(buf, 0, buf.length)) > 0) {
+                output.append(buf, 0, nRead);
+            }
+        }
+
+        /** Get the output of the shell command.*/
+        public String getOutput() {
+            return (output == null) ? "" : output.toString();
+        }
+
+        /**
+         * Returns the commands of this instance.
+         * Arguments with spaces in are presented with quotes round; other
+         * arguments are presented raw
+         *
+         * @return a string representation of the object.
+         */
+        @Override
+        public String toString() {
+            StringBuilder builder = new StringBuilder();
+            String[] args = getExecString();
+            for (String s : args) {
+                if (s.indexOf(' ') >= 0) {
+                    builder.append('"').append(s).append('"');
+                } else {
+                    builder.append(s);
+                }
+                builder.append(' ');
+            }
+            return builder.toString();
+        }
+    }
+
+    /**
+     * To check if the passed script to shell command executor timed out or
+     * not.
+     *
+     * @return if the script timed out.
+     */
+    public boolean isTimedOut() {
+        return timedOut.get();
+    }
+
+    /**
+     * Set if the command has timed out.
+     *
+     */
+    private void setTimedOut() {
+        this.timedOut.set(true);
+    }
+
+    /**
+     * Static method to execute a shell command.
+     * Covers most of the simple cases without requiring the user to implement
+     * the <code>Shell</code> interface.
+     * @param cmd shell command to execute.
+     * @return the output of the executed command.
+     */
+    public static String execCommand(String... cmd) throws IOException {
+        return execCommand(null, cmd, 0L);
+    }
+
+    /**
+     * Static method to execute a shell command.
+     * Covers most of the simple cases without requiring the user to implement
+     * the <code>Shell</code> interface.
+     * @param env the map of environment key=value
+     * @param cmd shell command to execute.
+     * @param timeout time in milliseconds after which script should be marked timeout
+     * @return the output of the executed command.o
+     */
+
+    public static String execCommand(Map<String, String> env, String[] cmd, long timeout) throws IOException {
+        ShellCommandExecutor exec = new ShellCommandExecutor(cmd, null, env, timeout);
+        exec.execute();
+        return exec.getOutput();
+    }
+
+    /**
+     * Static method to execute a shell command.
+     * Covers most of the simple cases without requiring the user to implement
+     * the <code>Shell</code> interface.
+     * @param env the map of environment key=value
+     * @param cmd shell command to execute.
+     * @return the output of the executed command.
+     */
+    public static String execCommand(Map<String, String> env, String... cmd) throws IOException {
+        return execCommand(env, cmd, 0L);
+    }
+
+    /**
+     * Timer which is used to timeout scripts spawned off by shell.
+     */
+    private static class ShellTimeoutTimerTask extends TimerTask {
+
+        private Shell shell;
+
+        public ShellTimeoutTimerTask(Shell shell) {
+            this.shell = shell;
+        }
+
+        @Override
+        public void run() {
+            Process p = shell.getProcess();
+            try {
+                p.exitValue();
+            } catch (Exception e) {
+                //Process has not terminated.
+                //So check if it has completed
+                //if not just destroy it.
+                if (p != null && !shell.completed.get()) {
+                    shell.setTimedOut();
+                    p.destroy();
+                }
+            }
+        }
+    }
+}

Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecoveryTest.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecoveryTest.java?rev=1492274&r1=1492273&r2=1492274&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecoveryTest.java (original)
+++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecoveryTest.java Wed Jun 12 16:41:47 2013
@@ -279,7 +279,7 @@ public class LedgerRecoveryTest extends 
                                 public void openComplete(int rc, LedgerHandle lh, Object ctx) {
                                     returnCode.set(rc);
                                     openLatch.countDown();
-                                    if (rc != BKException.Code.OK) {
+                                    if (rc == BKException.Code.OK) {
                                         try {
                                             lh.close();
                                         } catch (Exception e) {