You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2016/10/13 05:51:07 UTC
[3/5] bookkeeper git commit: BOOKKEEPER-612: Region aware placement
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/bbd1eb8d/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/NetUtils.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/NetUtils.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/NetUtils.java
index bcc880c..d954d04 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/NetUtils.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/NetUtils.java
@@ -15,16 +15,20 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-// This code has been copied from hadoop-common 0.23.1
package org.apache.bookkeeper.net;
import java.net.InetAddress;
+import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
public class NetUtils {
+ private static final Logger logger = LoggerFactory.getLogger(NetUtils.class);
/**
* Given a string representation of a host, return its ip address
@@ -58,4 +62,24 @@ public class NetUtils {
return hostNames;
}
+ public static String resolveNetworkLocation(DNSToSwitchMapping dnsResolver, InetSocketAddress addr) {
+ List<String> names = new ArrayList<String>(1);
+ if (dnsResolver instanceof CachedDNSToSwitchMapping) {
+ names.add(addr.getAddress().getHostAddress());
+ } else {
+ names.add(addr.getHostName());
+ }
+ // resolve network addresses
+ List<String> rNames = dnsResolver.resolve(names);
+ String netLoc;
+ if (null == rNames) {
+ logger.warn("Failed to resolve network location for {}, using default rack for them : {}.", names,
+ NetworkTopology.DEFAULT_RACK);
+ netLoc = NetworkTopology.DEFAULT_RACK;
+ } else {
+ netLoc = rNames.get(0);
+ }
+ return netLoc;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/bbd1eb8d/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/NetworkTopology.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/NetworkTopology.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/NetworkTopology.java
index 26abc96..18f3ec9 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/NetworkTopology.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/NetworkTopology.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -15,863 +15,64 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-// This code has been copied from hadoop-common 0.23.1
package org.apache.bookkeeper.net;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Random;
import java.util.Set;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
- * The class represents a cluster of computer with a tree hierarchical
- * network topology.
- * For example, a cluster may be consists of many data centers filled
- * with racks of computers.
- * In a network topology, leaves represent data nodes (computers) and inner
- * nodes represent switches/routers that manage traffic in/out of data centers
- * or racks.
- *
+ * Network Topology Interface
*/
-public class NetworkTopology {
-
- public final static String DEFAULT_RACK = "/default-rack";
- public final static int DEFAULT_HOST_LEVEL = 2;
- public static final Logger LOG = LoggerFactory.getLogger(NetworkTopology.class);
-
- public static class InvalidTopologyException extends RuntimeException {
- private static final long serialVersionUID = 1L;
-
- public InvalidTopologyException(String msg) {
- super(msg);
- }
- }
-
- /** InnerNode represents a switch/router of a data center or rack.
- * Different from a leaf node, it has non-null children.
- */
- static class InnerNode extends NodeBase {
- protected List<Node> children = new ArrayList<Node>();
- private int numOfLeaves;
-
- /** Construct an InnerNode from a path-like string */
- InnerNode(String path) {
- super(path);
- }
-
- /** Construct an InnerNode from its name and its network location */
- InnerNode(String name, String location) {
- super(name, location);
- }
-
- /** Construct an InnerNode
- * from its name, its network location, its parent, and its level */
- InnerNode(String name, String location, InnerNode parent, int level) {
- super(name, location, parent, level);
- }
-
- /** @return its children */
- List<Node> getChildren() {
- return children;
- }
-
- /** @return the number of children this node has */
- int getNumOfChildren() {
- return children.size();
- }
-
- /** Judge if this node represents a rack
- * @return true if it has no child or its children are not InnerNodes
- */
- boolean isRack() {
- if (children.isEmpty()) {
- return true;
- }
-
- Node firstChild = children.get(0);
- if (firstChild instanceof InnerNode) {
- return false;
- }
-
- return true;
- }
-
- /** Judge if this node is an ancestor of node <i>n</i>
- *
- * @param n a node
- * @return true if this node is an ancestor of <i>n</i>
- */
- boolean isAncestor(Node n) {
- return getPath(this).equals(NodeBase.PATH_SEPARATOR_STR)
- || (n.getNetworkLocation() + NodeBase.PATH_SEPARATOR_STR).startsWith(getPath(this)
- + NodeBase.PATH_SEPARATOR_STR);
- }
-
- /** Judge if this node is the parent of node <i>n</i>
- *
- * @param n a node
- * @return true if this node is the parent of <i>n</i>
- */
- boolean isParent(Node n) {
- return n.getNetworkLocation().equals(getPath(this));
- }
-
- /* Return a child name of this node who is an ancestor of node <i>n</i> */
- private String getNextAncestorName(Node n) {
- if (!isAncestor(n)) {
- throw new IllegalArgumentException(this + "is not an ancestor of " + n);
- }
- String name = n.getNetworkLocation().substring(getPath(this).length());
- if (name.charAt(0) == PATH_SEPARATOR) {
- name = name.substring(1);
- }
- int index = name.indexOf(PATH_SEPARATOR);
- if (index != -1)
- name = name.substring(0, index);
- return name;
- }
-
- /** Add node <i>n</i> to the subtree of this node
- * @param n node to be added
- * @return true if the node is added; false otherwise
- */
- boolean add(Node n) {
- if (!isAncestor(n))
- throw new IllegalArgumentException(n.getName() + ", which is located at " + n.getNetworkLocation()
- + ", is not a decendent of " + getPath(this));
- if (isParent(n)) {
- // this node is the parent of n; add n directly
- n.setParent(this);
- n.setLevel(this.level + 1);
- for (int i = 0; i < children.size(); i++) {
- if (children.get(i).getName().equals(n.getName())) {
- children.set(i, n);
- return false;
- }
- }
- children.add(n);
- numOfLeaves++;
- return true;
- } else {
- // find the next ancestor node
- String parentName = getNextAncestorName(n);
- InnerNode parentNode = null;
- for (int i = 0; i < children.size(); i++) {
- if (children.get(i).getName().equals(parentName)) {
- parentNode = (InnerNode) children.get(i);
- break;
- }
- }
- if (parentNode == null) {
- // create a new InnerNode
- parentNode = createParentNode(parentName);
- children.add(parentNode);
- }
- // add n to the subtree of the next ancestor node
- if (parentNode.add(n)) {
- numOfLeaves++;
- return true;
- } else {
- return false;
- }
- }
- }
-
- /**
- * Creates a parent node to be added to the list of children.
- * Creates a node using the InnerNode four argument constructor specifying
- * the name, location, parent, and level of this node.
- *
- * <p>To be overridden in subclasses for specific InnerNode implementations,
- * as alternative to overriding the full {@link #add(Node)} method.
- *
- * @param parentName The name of the parent node
- * @return A new inner node
- * @see InnerNode#InnerNode(String, String, InnerNode, int)
- */
- protected InnerNode createParentNode(String parentName) {
- return new InnerNode(parentName, getPath(this), this, this.getLevel() + 1);
- }
+public interface NetworkTopology {
- /** Remove node <i>n</i> from the subtree of this node
- * @param n node to be deleted
- * @return true if the node is deleted; false otherwise
- */
- boolean remove(Node n) {
- String parent = n.getNetworkLocation();
- String currentPath = getPath(this);
- if (!isAncestor(n))
- throw new IllegalArgumentException(n.getName() + ", which is located at " + parent
- + ", is not a descendent of " + currentPath);
- if (isParent(n)) {
- // this node is the parent of n; remove n directly
- for (int i = 0; i < children.size(); i++) {
- if (children.get(i).getName().equals(n.getName())) {
- children.remove(i);
- numOfLeaves--;
- n.setParent(null);
- return true;
- }
- }
- return false;
- } else {
- // find the next ancestor node: the parent node
- String parentName = getNextAncestorName(n);
- InnerNode parentNode = null;
- int i;
- for (i = 0; i < children.size(); i++) {
- if (children.get(i).getName().equals(parentName)) {
- parentNode = (InnerNode) children.get(i);
- break;
- }
- }
- if (parentNode == null) {
- return false;
- }
- // remove n from the parent node
- boolean isRemoved = parentNode.remove(n);
- // if the parent node has no children, remove the parent node too
- if (isRemoved) {
- if (parentNode.getNumOfChildren() == 0) {
- children.remove(i);
- }
- numOfLeaves--;
- }
- return isRemoved;
- }
- } // end of remove
-
- /** Given a node's string representation, return a reference to the node
- * @param loc string location of the form /rack/node
- * @return null if the node is not found or the childnode is there but
- * not an instance of {@link InnerNode}
- */
- private Node getLoc(String loc) {
- if (loc == null || loc.length() == 0)
- return this;
-
- String[] path = loc.split(PATH_SEPARATOR_STR, 2);
- Node childnode = null;
- for (int i = 0; i < children.size(); i++) {
- if (children.get(i).getName().equals(path[0])) {
- childnode = children.get(i);
- }
- }
- if (childnode == null)
- return null; // non-existing node
- if (path.length == 1)
- return childnode;
- if (childnode instanceof InnerNode) {
- return ((InnerNode) childnode).getLoc(path[1]);
- } else {
- return null;
- }
- }
-
- /** get <i>leafIndex</i> leaf of this subtree
- * if it is not in the <i>excludedNode</i>
- *
- * @param leafIndex an indexed leaf of the node
- * @param excludedNode an excluded node (can be null)
- * @return
- */
- Node getLeaf(int leafIndex, Node excludedNode) {
- int count = 0;
- // check if the excluded node a leaf
- boolean isLeaf = excludedNode == null || !(excludedNode instanceof InnerNode);
- // calculate the total number of excluded leaf nodes
- int numOfExcludedLeaves = isLeaf ? 1 : ((InnerNode) excludedNode).getNumOfLeaves();
- if (isLeafParent()) { // children are leaves
- if (isLeaf) { // excluded node is a leaf node
- int excludedIndex = children.indexOf(excludedNode);
- if (excludedIndex != -1 && leafIndex >= 0) {
- // excluded node is one of the children so adjust the leaf index
- leafIndex = leafIndex >= excludedIndex ? leafIndex + 1 : leafIndex;
- }
- }
- // range check
- if (leafIndex < 0 || leafIndex >= this.getNumOfChildren()) {
- return null;
- }
- return children.get(leafIndex);
- } else {
- for (int i = 0; i < children.size(); i++) {
- InnerNode child = (InnerNode) children.get(i);
- if (excludedNode == null || excludedNode != child) {
- // not the excludedNode
- int numOfLeaves = child.getNumOfLeaves();
- if (excludedNode != null && child.isAncestor(excludedNode)) {
- numOfLeaves -= numOfExcludedLeaves;
- }
- if (count + numOfLeaves > leafIndex) {
- // the leaf is in the child subtree
- return child.getLeaf(leafIndex - count, excludedNode);
- } else {
- // go to the next child
- count = count + numOfLeaves;
- }
- } else { // it is the excluededNode
- // skip it and set the excludedNode to be null
- excludedNode = null;
- }
- }
- return null;
- }
- }
-
- protected boolean isLeafParent() {
- return isRack();
- }
-
- /**
- * Determine if children a leaves, default implementation calls {@link #isRack()}
- * <p>To be overridden in subclasses for specific InnerNode implementations,
- * as alternative to overriding the full {@link #getLeaf(int, Node)} method.
- *
- * @return true if children are leaves, false otherwise
- */
- protected boolean areChildrenLeaves() {
- return isRack();
- }
-
- /**
- * Get number of leaves.
- */
- int getNumOfLeaves() {
- return numOfLeaves;
- }
- } // end of InnerNode
-
- /**
- * the root cluster map
- */
- InnerNode clusterMap;
- /** Depth of all leaf nodes */
- private int depthOfAllLeaves = -1;
- /** rack counter */
- protected int numOfRacks = 0;
- /** the lock used to manage access */
- protected ReadWriteLock netlock = new ReentrantReadWriteLock();
-
- public NetworkTopology() {
- clusterMap = new InnerNode(InnerNode.ROOT);
- }
-
- /** Add a leaf node
- * Update node counter & rack counter if necessary
- * @param node node to be added; can be null
- * @exception IllegalArgumentException if add a node to a leave
- or node to be added is not a leaf
- */
- public void add(Node node) {
- if (node == null)
- return;
- String oldTopoStr = this.toString();
- if (node instanceof InnerNode) {
- throw new IllegalArgumentException("Not allow to add an inner node: " + NodeBase.getPath(node));
- }
- int newDepth = NodeBase.locationToDepth(node.getNetworkLocation()) + 1;
- netlock.writeLock().lock();
- try {
- if ((depthOfAllLeaves != -1) && (depthOfAllLeaves != newDepth)) {
- LOG.error("Error: can't add leaf node at depth " + newDepth + " to topology:\n" + oldTopoStr);
- throw new InvalidTopologyException("Invalid network topology. "
- + "You cannot have a rack and a non-rack node at the same level of the network topology.");
- }
- Node rack = getNodeForNetworkLocation(node);
- if (rack != null && !(rack instanceof InnerNode)) {
- throw new IllegalArgumentException("Unexpected data node " + node.toString()
- + " at an illegal network location");
- }
- if (clusterMap.add(node)) {
- LOG.info("Adding a new node: " + NodeBase.getPath(node));
- if (rack == null) {
- numOfRacks++;
- }
- if (!(node instanceof InnerNode)) {
- if (depthOfAllLeaves == -1) {
- depthOfAllLeaves = node.getLevel();
- }
- }
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("NetworkTopology became:\n" + this.toString());
- }
- } finally {
- netlock.writeLock().unlock();
- }
- }
+ public final static String DEFAULT_REGION = "/default-region";
+ public final static String DEFAULT_RACK = "/default-region/default-rack";
/**
- * Return a reference to the node given its string representation.
- * Default implementation delegates to {@link #getNode(String)}.
- *
- * <p>To be overridden in subclasses for specific NetworkTopology
- * implementations, as alternative to overriding the full {@link #add(Node)}
- * method.
+ * Add a node to the network topology
*
- * @param node The string representation of this node's network location is
- * used to retrieve a Node object.
- * @return a reference to the node; null if the node is not in the tree
- *
- * @see #add(Node)
- * @see #getNode(String)
+ * @param node
+ * add the node to network topology
*/
- protected Node getNodeForNetworkLocation(Node node) {
- return getNode(node.getNetworkLocation());
- }
+ void add(Node node);
/**
- * Given a string representation of a rack, return its children
- * @param loc a path-like string representation of a rack
- * @return a newly allocated list with all the node's children
- */
- public List<Node> getDatanodesInRack(String loc) {
- netlock.readLock().lock();
- try {
- loc = NodeBase.normalize(loc);
- if (!NodeBase.ROOT.equals(loc)) {
- loc = loc.substring(1);
- }
- InnerNode rack = (InnerNode) clusterMap.getLoc(loc);
- if (rack == null) {
- return null;
- }
- return new ArrayList<Node>(rack.getChildren());
- } finally {
- netlock.readLock().unlock();
- }
- }
-
- /** Remove a node
- * Update node counter and rack counter if necessary
- * @param node node to be removed; can be null
- */
- public void remove(Node node) {
- if (node == null)
- return;
- if (node instanceof InnerNode) {
- throw new IllegalArgumentException("Not allow to remove an inner node: " + NodeBase.getPath(node));
- }
- LOG.info("Removing a node: " + NodeBase.getPath(node));
- netlock.writeLock().lock();
- try {
- if (clusterMap.remove(node)) {
- InnerNode rack = (InnerNode) getNode(node.getNetworkLocation());
- if (rack == null) {
- numOfRacks--;
- }
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("NetworkTopology became:\n" + this.toString());
- }
- } finally {
- netlock.writeLock().unlock();
- }
- }
-
- /** Check if the tree contains node <i>node</i>
+ * Remove a node from nework topology
*
- * @param node a node
- * @return true if <i>node</i> is already in the tree; false otherwise
+ * @param node
+ * remove the node from network topology
*/
- public boolean contains(Node node) {
- if (node == null)
- return false;
- netlock.readLock().lock();
- try {
- Node parent = node.getParent();
- for (int level = node.getLevel(); parent != null && level > 0; parent = parent.getParent(), level--) {
- if (parent == clusterMap) {
- return true;
- }
- }
- } finally {
- netlock.readLock().unlock();
- }
- return false;
- }
-
- /** Given a string representation of a node, return its reference
- *
- * @param loc
- * a path-like string representation of a node
- * @return a reference to the node; null if the node is not in the tree
- */
- public Node getNode(String loc) {
- netlock.readLock().lock();
- try {
- loc = NodeBase.normalize(loc);
- if (!NodeBase.ROOT.equals(loc))
- loc = loc.substring(1);
- return clusterMap.getLoc(loc);
- } finally {
- netlock.readLock().unlock();
- }
- }
-
- /** Given a string representation of a rack for a specific network
- * location
- *
- * To be overridden in subclasses for specific NetworkTopology
- * implementations, as alternative to overriding the full
- * {@link #getRack(String)} method.
- * @param loc
- * a path-like string representation of a network location
- * @return a rack string
- */
- public String getRack(String loc) {
- return loc;
- }
-
- /** @return the total number of racks */
- public int getNumOfRacks() {
- netlock.readLock().lock();
- try {
- return numOfRacks;
- } finally {
- netlock.readLock().unlock();
- }
- }
-
- /** @return the total number of leaf nodes */
- public int getNumOfLeaves() {
- netlock.readLock().lock();
- try {
- return clusterMap.getNumOfLeaves();
- } finally {
- netlock.readLock().unlock();
- }
- }
-
- /** Return the distance between two nodes
- * It is assumed that the distance from one node to its parent is 1
- * The distance between two nodes is calculated by summing up their distances
- * to their closest common ancestor.
- * @param node1 one node
- * @param node2 another node
- * @return the distance between node1 and node2 which is zero if they are the same
- * or {@link Integer#MAX_VALUE} if node1 or node2 do not belong to the cluster
- */
- public int getDistance(Node node1, Node node2) {
- if (node1 == node2) {
- return 0;
- }
- Node n1 = node1, n2 = node2;
- int dis = 0;
- netlock.readLock().lock();
- try {
- int level1 = node1.getLevel(), level2 = node2.getLevel();
- while (n1 != null && level1 > level2) {
- n1 = n1.getParent();
- level1--;
- dis++;
- }
- while (n2 != null && level2 > level1) {
- n2 = n2.getParent();
- level2--;
- dis++;
- }
- while (n1 != null && n2 != null && n1.getParent() != n2.getParent()) {
- n1 = n1.getParent();
- n2 = n2.getParent();
- dis += 2;
- }
- } finally {
- netlock.readLock().unlock();
- }
- if (n1 == null) {
- LOG.warn("The cluster does not contain node: " + NodeBase.getPath(node1));
- return Integer.MAX_VALUE;
- }
- if (n2 == null) {
- LOG.warn("The cluster does not contain node: " + NodeBase.getPath(node2));
- return Integer.MAX_VALUE;
- }
- return dis + 2;
- }
-
- /** Check if two nodes are on the same rack
- * @param node1 one node (can be null)
- * @param node2 another node (can be null)
- * @return true if node1 and node2 are on the same rack; false otherwise
- * @exception IllegalArgumentException when either node1 or node2 is null, or
- * node1 or node2 do not belong to the cluster
- */
- public boolean isOnSameRack(Node node1, Node node2) {
- if (node1 == null || node2 == null) {
- return false;
- }
-
- netlock.readLock().lock();
- try {
- return isSameParents(node1, node2);
- } finally {
- netlock.readLock().unlock();
- }
- }
-
- /**
- * Check if network topology is aware of NodeGroup
- */
- public boolean isNodeGroupAware() {
- return false;
- }
+ void remove(Node node);
/**
- * Return false directly as not aware of NodeGroup, to be override in sub-class
- */
- public boolean isOnSameNodeGroup(Node node1, Node node2) {
- return false;
- }
-
- /**
- * Compare the parents of each node for equality
- *
- * <p>To be overridden in subclasses for specific NetworkTopology
- * implementations, as alternative to overriding the full
- * {@link #isOnSameRack(Node, Node)} method.
+ * Check if the tree contains node <i>node</i>.
*
- * @param node1 the first node to compare
- * @param node2 the second node to compare
- * @return true if their parents are equal, false otherwise
- *
- * @see #isOnSameRack(Node, Node)
- */
- protected boolean isSameParents(Node node1, Node node2) {
- return node1.getParent() == node2.getParent();
- }
-
- final protected static Random r = new Random();
-
- /** randomly choose one node from <i>scope</i>
- * if scope starts with ~, choose one from the all nodes except for the
- * ones in <i>scope</i>; otherwise, choose one from <i>scope</i>
- * @param scope range of nodes from which a node will be chosen
- * @return the chosen node
- */
- public Node chooseRandom(String scope) {
- netlock.readLock().lock();
- try {
- if (scope.startsWith("~")) {
- return chooseRandom(NodeBase.ROOT, scope.substring(1));
- } else {
- return chooseRandom(scope, null);
- }
- } finally {
- netlock.readLock().unlock();
- }
- }
-
- private Node chooseRandom(String scope, String excludedScope) {
- if (excludedScope != null) {
- if (scope.startsWith(excludedScope)) {
- return null;
- }
- if (!excludedScope.startsWith(scope)) {
- excludedScope = null;
- }
- }
- Node node = getNode(scope);
- if (!(node instanceof InnerNode)) {
- return node;
- }
- InnerNode innerNode = (InnerNode) node;
- int numOfDatanodes = innerNode.getNumOfLeaves();
- if (excludedScope == null) {
- node = null;
- } else {
- node = getNode(excludedScope);
- if (!(node instanceof InnerNode)) {
- numOfDatanodes -= 1;
- } else {
- numOfDatanodes -= ((InnerNode) node).getNumOfLeaves();
- }
- }
- int leaveIndex = r.nextInt(numOfDatanodes);
- return innerNode.getLeaf(leaveIndex, node);
- }
-
- /** return leaves in <i>scope</i>
- * @param scope a path string
- * @return leaves nodes under specific scope
- */
- private Set<Node> doGetLeaves(String scope) {
- Node node = getNode(scope);
- Set<Node> leafNodes = new HashSet<Node>();
- if (!(node instanceof InnerNode)) {
- leafNodes.add(node);
- } else {
- InnerNode innerNode = (InnerNode) node;
- for (int i = 0; i < innerNode.getNumOfLeaves(); i++) {
- leafNodes.add(innerNode.getLeaf(i, null));
- }
- }
- return leafNodes;
- }
-
- public Set<Node> getLeaves(String scope) {
- netlock.readLock().lock();
- try {
- if (scope.startsWith("~")) {
- Set<Node> allNodes = doGetLeaves(NodeBase.ROOT);
- Set<Node> excludeNodes = doGetLeaves(scope.substring(1));
- allNodes.removeAll(excludeNodes);
- return allNodes;
- } else {
- return doGetLeaves(scope);
- }
- } finally {
- netlock.readLock().unlock();
- }
- }
-
- /** return the number of leaves in <i>scope</i> but not in <i>excludedNodes</i>
- * if scope starts with ~, return the number of nodes that are not
- * in <i>scope</i> and <i>excludedNodes</i>;
- * @param scope a path string that may start with ~
- * @param excludedNodes a list of nodes
- * @return number of available nodes
+ * @param node
+ * node to check
+ * @return true if <i>node</i> is already in the network topology, otherwise false.
*/
- public int countNumOfAvailableNodes(String scope, Collection<Node> excludedNodes) {
- boolean isExcluded = false;
- if (scope.startsWith("~")) {
- isExcluded = true;
- scope = scope.substring(1);
- }
- scope = NodeBase.normalize(scope);
- int count = 0; // the number of nodes in both scope & excludedNodes
- netlock.readLock().lock();
- try {
- for (Node node : excludedNodes) {
- if ((NodeBase.getPath(node) + NodeBase.PATH_SEPARATOR_STR).startsWith(scope
- + NodeBase.PATH_SEPARATOR_STR)) {
- count++;
- }
- }
- Node n = getNode(scope);
- int scopeNodeCount = 1;
- if (n instanceof InnerNode) {
- scopeNodeCount = ((InnerNode) n).getNumOfLeaves();
- }
- if (isExcluded) {
- return clusterMap.getNumOfLeaves() - scopeNodeCount - excludedNodes.size() + count;
- } else {
- return scopeNodeCount - count;
- }
- } finally {
- netlock.readLock().unlock();
- }
- }
-
- /** convert a network tree to a string */
- @Override
- public String toString() {
- // print the number of racks
- StringBuilder tree = new StringBuilder();
- tree.append("Number of racks: ");
- tree.append(numOfRacks);
- tree.append("\n");
- // print the number of leaves
- int numOfLeaves = getNumOfLeaves();
- tree.append("Expected number of leaves:");
- tree.append(numOfLeaves);
- tree.append("\n");
- // print nodes
- for (int i = 0; i < numOfLeaves; i++) {
- tree.append(NodeBase.getPath(clusterMap.getLeaf(i, null)));
- tree.append("\n");
- }
- return tree.toString();
- }
+ boolean contains(Node node);
/**
- * Divide networklocation string into two parts by last separator, and get
- * the first part here.
- *
- * @param networkLocation
+ * Retrieve a node from the network topology
+ * @param loc
* @return
*/
- public static String getFirstHalf(String networkLocation) {
- int index = networkLocation.lastIndexOf(NodeBase.PATH_SEPARATOR_STR);
- return networkLocation.substring(0, index);
- }
+ Node getNode(String loc);
/**
- * Divide networklocation string into two parts by last separator, and get
- * the second part here.
+ * Returns number of racks in the network topology.
*
- * @param networkLocation
- * @return
+ * @return number of racks in the network topology.
*/
- public static String getLastHalf(String networkLocation) {
- int index = networkLocation.lastIndexOf(NodeBase.PATH_SEPARATOR_STR);
- return networkLocation.substring(index);
- }
-
- /** swap two array items */
- static protected void swap(Node[] nodes, int i, int j) {
- Node tempNode;
- tempNode = nodes[j];
- nodes[j] = nodes[i];
- nodes[i] = tempNode;
- }
+ int getNumOfRacks();
- /** Sort nodes array by their distances to <i>reader</i>
- * It linearly scans the array, if a local node is found, swap it with
- * the first element of the array.
- * If a local rack node is found, swap it with the first element following
- * the local node.
- * If neither local node or local rack node is found, put a random replica
- * location at position 0.
- * It leaves the rest nodes untouched.
- * @param reader the node that wishes to read a block from one of the nodes
- * @param nodes the list of nodes containing data for the reader
+ /**
+ * Returns the nodes under a location.
+ *
+ * @param loc
+ * network location
+ * @return nodes under a location
*/
- public void pseudoSortByDistance(Node reader, Node[] nodes) {
- int tempIndex = 0;
- int localRackNode = -1;
- if (reader != null) {
- //scan the array to find the local node & local rack node
- for (int i = 0; i < nodes.length; i++) {
- if (tempIndex == 0 && reader == nodes[i]) { //local node
- //swap the local node and the node at position 0
- if (i != 0) {
- swap(nodes, tempIndex, i);
- }
- tempIndex = 1;
- if (localRackNode != -1) {
- if (localRackNode == 0) {
- localRackNode = i;
- }
- break;
- }
- } else if (localRackNode == -1 && isOnSameRack(reader, nodes[i])) {
- //local rack
- localRackNode = i;
- if (tempIndex != 0)
- break;
- }
- }
-
- // swap the local rack node and the node at position tempIndex
- if (localRackNode != -1 && localRackNode != tempIndex) {
- swap(nodes, tempIndex, localRackNode);
- tempIndex++;
- }
- }
-
- // put a random node at position 0 if it is not a local/local-rack node
- if (tempIndex == 0 && localRackNode == -1 && nodes.length != 0) {
- swap(nodes, 0, r.nextInt(nodes.length));
- }
- }
+ Set<Node> getLeaves(String loc);
}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/bbd1eb8d/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/NetworkTopologyImpl.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/NetworkTopologyImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/NetworkTopologyImpl.java
new file mode 100644
index 0000000..78c4fe4
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/NetworkTopologyImpl.java
@@ -0,0 +1,880 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.net;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The class represents a cluster of computer with a tree hierarchical
+ * network topology.
+ * For example, a cluster may be consists of many data centers filled
+ * with racks of computers.
+ * In a network topology, leaves represent data nodes (computers) and inner
+ * nodes represent switches/routers that manage traffic in/out of data centers
+ * or racks.
+ *
+ */
+public class NetworkTopologyImpl implements NetworkTopology {
+
+ public final static int DEFAULT_HOST_LEVEL = 2;
+ public static final Logger LOG = LoggerFactory.getLogger(NetworkTopologyImpl.class);
+
+ public static class InvalidTopologyException extends RuntimeException {
+ private static final long serialVersionUID = 1L;
+
+ public InvalidTopologyException(String msg) {
+ super(msg);
+ }
+ }
+
+ /** InnerNode represents a switch/router of a data center or rack.
+ * Different from a leaf node, it has non-null children.
+ */
+ static class InnerNode extends NodeBase {
+ protected List<Node> children = new ArrayList<Node>();
+ private int numOfLeaves;
+
+ /** Construct an InnerNode from a path-like string */
+ InnerNode(String path) {
+ super(path);
+ }
+
+ /** Construct an InnerNode from its name and its network location */
+ InnerNode(String name, String location) {
+ super(name, location);
+ }
+
+ /** Construct an InnerNode
+ * from its name, its network location, its parent, and its level */
+ InnerNode(String name, String location, InnerNode parent, int level) {
+ super(name, location, parent, level);
+ }
+
+ /** @return its children */
+ List<Node> getChildren() {
+ return children;
+ }
+
+ /** @return the number of children this node has */
+ int getNumOfChildren() {
+ return children.size();
+ }
+
+ /** Judge if this node represents a rack
+ * @return true if it has no child or its children are not InnerNodes
+ */
+ boolean isRack() {
+ if (children.isEmpty()) {
+ return true;
+ }
+
+ Node firstChild = children.get(0);
+ if (firstChild instanceof InnerNode) {
+ return false;
+ }
+
+ return true;
+ }
+
+ /** Judge if this node is an ancestor of node <i>n</i>
+ *
+ * @param n a node
+ * @return true if this node is an ancestor of <i>n</i>
+ */
+ boolean isAncestor(Node n) {
+ return getPath(this).equals(NodeBase.PATH_SEPARATOR_STR)
+ || (n.getNetworkLocation() + NodeBase.PATH_SEPARATOR_STR).startsWith(getPath(this)
+ + NodeBase.PATH_SEPARATOR_STR);
+ }
+
+ /** Judge if this node is the parent of node <i>n</i>
+ *
+ * @param n a node
+ * @return true if this node is the parent of <i>n</i>
+ */
+ boolean isParent(Node n) {
+ return n.getNetworkLocation().equals(getPath(this));
+ }
+
+ /* Return a child name of this node who is an ancestor of node <i>n</i> */
+ private String getNextAncestorName(Node n) {
+ if (!isAncestor(n)) {
+ throw new IllegalArgumentException(this + "is not an ancestor of " + n);
+ }
+ String name = n.getNetworkLocation().substring(getPath(this).length());
+ if (name.charAt(0) == PATH_SEPARATOR) {
+ name = name.substring(1);
+ }
+ int index = name.indexOf(PATH_SEPARATOR);
+ if (index != -1)
+ name = name.substring(0, index);
+ return name;
+ }
+
+ /** Add node <i>n</i> to the subtree of this node
+ * @param n node to be added
+ * @return true if the node is added; false otherwise
+ */
+ boolean add(Node n) {
+ if (!isAncestor(n))
+ throw new IllegalArgumentException(n.getName() + ", which is located at " + n.getNetworkLocation()
+ + ", is not a decendent of " + getPath(this));
+ if (isParent(n)) {
+ // this node is the parent of n; add n directly
+ n.setParent(this);
+ n.setLevel(this.level + 1);
+ for (int i = 0; i < children.size(); i++) {
+ if (children.get(i).getName().equals(n.getName())) {
+ children.set(i, n);
+ return false;
+ }
+ }
+ children.add(n);
+ numOfLeaves++;
+ return true;
+ } else {
+ // find the next ancestor node
+ String parentName = getNextAncestorName(n);
+ InnerNode parentNode = null;
+ for (int i = 0; i < children.size(); i++) {
+ if (children.get(i).getName().equals(parentName)) {
+ parentNode = (InnerNode) children.get(i);
+ break;
+ }
+ }
+ if (parentNode == null) {
+ // create a new InnerNode
+ parentNode = createParentNode(parentName);
+ children.add(parentNode);
+ }
+ // add n to the subtree of the next ancestor node
+ if (parentNode.add(n)) {
+ numOfLeaves++;
+ return true;
+ } else {
+ return false;
+ }
+ }
+ }
+
+ /**
+ * Creates a parent node to be added to the list of children.
+ * Creates a node using the InnerNode four argument constructor specifying
+ * the name, location, parent, and level of this node.
+ *
+ * <p>To be overridden in subclasses for specific InnerNode implementations,
+ * as alternative to overriding the full {@link #add(Node)} method.
+ *
+ * @param parentName The name of the parent node
+ * @return A new inner node
+ * @see InnerNode#InnerNode(String, String, InnerNode, int)
+ */
+ protected InnerNode createParentNode(String parentName) {
+ return new InnerNode(parentName, getPath(this), this, this.getLevel() + 1);
+ }
+
+ /** Remove node <i>n</i> from the subtree of this node
+ * @param n node to be deleted
+ * @return true if the node is deleted; false otherwise
+ */
+ boolean remove(Node n) {
+ String parent = n.getNetworkLocation();
+ String currentPath = getPath(this);
+ if (!isAncestor(n))
+ throw new IllegalArgumentException(n.getName() + ", which is located at " + parent
+ + ", is not a descendent of " + currentPath);
+ if (isParent(n)) {
+ // this node is the parent of n; remove n directly
+ for (int i = 0; i < children.size(); i++) {
+ if (children.get(i).getName().equals(n.getName())) {
+ children.remove(i);
+ numOfLeaves--;
+ n.setParent(null);
+ return true;
+ }
+ }
+ return false;
+ } else {
+ // find the next ancestor node: the parent node
+ String parentName = getNextAncestorName(n);
+ InnerNode parentNode = null;
+ int i;
+ for (i = 0; i < children.size(); i++) {
+ if (children.get(i).getName().equals(parentName)) {
+ parentNode = (InnerNode) children.get(i);
+ break;
+ }
+ }
+ if (parentNode == null) {
+ return false;
+ }
+ // remove n from the parent node
+ boolean isRemoved = parentNode.remove(n);
+ // if the parent node has no children, remove the parent node too
+ if (isRemoved) {
+ if (parentNode.getNumOfChildren() == 0) {
+ children.remove(i);
+ }
+ numOfLeaves--;
+ }
+ return isRemoved;
+ }
+ } // end of remove
+
+ /** Given a node's string representation, return a reference to the node
+ * @param loc string location of the form /rack/node
+ * @return null if the node is not found or the childnode is there but
+ * not an instance of {@link InnerNode}
+ */
+ private Node getLoc(String loc) {
+ if (loc == null || loc.length() == 0)
+ return this;
+
+ String[] path = loc.split(PATH_SEPARATOR_STR, 2);
+ Node childnode = null;
+ for (int i = 0; i < children.size(); i++) {
+ if (children.get(i).getName().equals(path[0])) {
+ childnode = children.get(i);
+ }
+ }
+ if (childnode == null)
+ return null; // non-existing node
+ if (path.length == 1)
+ return childnode;
+ if (childnode instanceof InnerNode) {
+ return ((InnerNode) childnode).getLoc(path[1]);
+ } else {
+ return null;
+ }
+ }
+
+ /** get <i>leafIndex</i> leaf of this subtree
+ * if it is not in the <i>excludedNode</i>
+ *
+ * @param leafIndex an indexed leaf of the node
+ * @param excludedNode an excluded node (can be null)
+ * @return
+ */
+ Node getLeaf(int leafIndex, Node excludedNode) {
+ int count = 0;
+ // check if the excluded node a leaf
+ boolean isLeaf = excludedNode == null || !(excludedNode instanceof InnerNode);
+ // calculate the total number of excluded leaf nodes
+ int numOfExcludedLeaves = isLeaf ? 1 : ((InnerNode) excludedNode).getNumOfLeaves();
+ if (isLeafParent()) { // children are leaves
+ if (isLeaf) { // excluded node is a leaf node
+ int excludedIndex = children.indexOf(excludedNode);
+ if (excludedIndex != -1 && leafIndex >= 0) {
+ // excluded node is one of the children so adjust the leaf index
+ leafIndex = leafIndex >= excludedIndex ? leafIndex + 1 : leafIndex;
+ }
+ }
+ // range check
+ if (leafIndex < 0 || leafIndex >= this.getNumOfChildren()) {
+ return null;
+ }
+ return children.get(leafIndex);
+ } else {
+ for (int i = 0; i < children.size(); i++) {
+ InnerNode child = (InnerNode) children.get(i);
+ if (excludedNode == null || excludedNode != child) {
+ // not the excludedNode
+ int numOfLeaves = child.getNumOfLeaves();
+ if (excludedNode != null && child.isAncestor(excludedNode)) {
+ numOfLeaves -= numOfExcludedLeaves;
+ }
+ if (count + numOfLeaves > leafIndex) {
+ // the leaf is in the child subtree
+ return child.getLeaf(leafIndex - count, excludedNode);
+ } else {
+ // go to the next child
+ count = count + numOfLeaves;
+ }
+ } else { // it is the excluededNode
+ // skip it and set the excludedNode to be null
+ excludedNode = null;
+ }
+ }
+ return null;
+ }
+ }
+
+ protected boolean isLeafParent() {
+ return isRack();
+ }
+
+ /**
+ * Determine if children a leaves, default implementation calls {@link #isRack()}
+ * <p>To be overridden in subclasses for specific InnerNode implementations,
+ * as alternative to overriding the full {@link #getLeaf(int, Node)} method.
+ *
+ * @return true if children are leaves, false otherwise
+ */
+ protected boolean areChildrenLeaves() {
+ return isRack();
+ }
+
+ /**
+ * Get number of leaves.
+ */
+ int getNumOfLeaves() {
+ return numOfLeaves;
+ }
+ } // end of InnerNode
+
+ /**
+ * the root cluster map
+ */
+ InnerNode clusterMap;
+ /** Depth of all leaf nodes */
+ private int depthOfAllLeaves = -1;
+ /** rack counter */
+ protected int numOfRacks = 0;
+ /** the lock used to manage access */
+ protected ReadWriteLock netlock = new ReentrantReadWriteLock();
+
+ public NetworkTopologyImpl() {
+ clusterMap = new InnerNode(InnerNode.ROOT);
+ }
+
+ /** Add a leaf node
+ * Update node counter & rack counter if necessary
+ * @param node node to be added; can be null
+ * @exception IllegalArgumentException if add a node to a leave
+ or node to be added is not a leaf
+ */
+ public void add(Node node) {
+ if (node == null)
+ return;
+ String oldTopoStr = this.toString();
+ if (node instanceof InnerNode) {
+ throw new IllegalArgumentException("Not allow to add an inner node: " + NodeBase.getPath(node));
+ }
+ int newDepth = NodeBase.locationToDepth(node.getNetworkLocation()) + 1;
+ netlock.writeLock().lock();
+ try {
+ if ((depthOfAllLeaves != -1) && (depthOfAllLeaves != newDepth)) {
+ LOG.error("Error: can't add leaf node at depth " + newDepth + " to topology:\n" + oldTopoStr);
+ throw new InvalidTopologyException("Invalid network topology. "
+ + "You cannot have a rack and a non-rack node at the same level of the network topology.");
+ }
+ Node rack = getNodeForNetworkLocation(node);
+ if (rack != null && !(rack instanceof InnerNode)) {
+ throw new IllegalArgumentException("Unexpected data node " + node.toString()
+ + " at an illegal network location");
+ }
+ if (clusterMap.add(node)) {
+ LOG.info("Adding a new node: " + NodeBase.getPath(node));
+ if (rack == null) {
+ numOfRacks++;
+ }
+ if (!(node instanceof InnerNode)) {
+ if (depthOfAllLeaves == -1) {
+ depthOfAllLeaves = node.getLevel();
+ }
+ }
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("NetworkTopology became:\n" + this.toString());
+ }
+ } finally {
+ netlock.writeLock().unlock();
+ }
+ }
+
+ /**
+ * Return a reference to the node given its string representation.
+ * Default implementation delegates to {@link #getNode(String)}.
+ *
+ * <p>To be overridden in subclasses for specific NetworkTopology
+ * implementations, as alternative to overriding the full {@link #add(Node)}
+ * method.
+ *
+ * @param node The string representation of this node's network location is
+ * used to retrieve a Node object.
+ * @return a reference to the node; null if the node is not in the tree
+ *
+ * @see #add(Node)
+ * @see #getNode(String)
+ */
+ protected Node getNodeForNetworkLocation(Node node) {
+ return getNode(node.getNetworkLocation());
+ }
+
+ /**
+ * Given a string representation of a rack, return its children
+ * @param loc a path-like string representation of a rack
+ * @return a newly allocated list with all the node's children
+ */
+ public List<Node> getDatanodesInRack(String loc) {
+ netlock.readLock().lock();
+ try {
+ loc = NodeBase.normalize(loc);
+ if (!NodeBase.ROOT.equals(loc)) {
+ loc = loc.substring(1);
+ }
+ InnerNode rack = (InnerNode) clusterMap.getLoc(loc);
+ if (rack == null) {
+ return null;
+ }
+ return new ArrayList<Node>(rack.getChildren());
+ } finally {
+ netlock.readLock().unlock();
+ }
+ }
+
+ /** Remove a node
+ * Update node counter and rack counter if necessary
+ * @param node node to be removed; can be null
+ */
+ @Override
+ public void remove(Node node) {
+ if (node == null)
+ return;
+ if (node instanceof InnerNode) {
+ throw new IllegalArgumentException("Not allow to remove an inner node: " + NodeBase.getPath(node));
+ }
+ LOG.info("Removing a node: " + NodeBase.getPath(node));
+ netlock.writeLock().lock();
+ try {
+ if (clusterMap.remove(node)) {
+ InnerNode rack = (InnerNode) getNode(node.getNetworkLocation());
+ if (rack == null) {
+ numOfRacks--;
+ }
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("NetworkTopology became:\n" + this.toString());
+ }
+ } finally {
+ netlock.writeLock().unlock();
+ }
+ }
+
+ /** Check if the tree contains node <i>node</i>
+ *
+ * @param node a node
+ * @return true if <i>node</i> is already in the tree; false otherwise
+ */
+ @Override
+ public boolean contains(Node node) {
+ if (node == null)
+ return false;
+ netlock.readLock().lock();
+ try {
+ Node parent = node.getParent();
+ for (int level = node.getLevel(); parent != null && level > 0; parent = parent.getParent(), level--) {
+ if (parent == clusterMap) {
+ return true;
+ }
+ }
+ } finally {
+ netlock.readLock().unlock();
+ }
+ return false;
+ }
+
+ /** Given a string representation of a node, return its reference
+ *
+ * @param loc
+ * a path-like string representation of a node
+ * @return a reference to the node; null if the node is not in the tree
+ */
+ @Override
+ public Node getNode(String loc) {
+ netlock.readLock().lock();
+ try {
+ loc = NodeBase.normalize(loc);
+ if (!NodeBase.ROOT.equals(loc))
+ loc = loc.substring(1);
+ return clusterMap.getLoc(loc);
+ } finally {
+ netlock.readLock().unlock();
+ }
+ }
+
+ /** Given a string representation of a rack for a specific network
+ * location
+ *
+ * To be overridden in subclasses for specific NetworkTopology
+ * implementations, as alternative to overriding the full
+ * {@link #getRack(String)} method.
+ * @param loc
+ * a path-like string representation of a network location
+ * @return a rack string
+ */
+ public String getRack(String loc) {
+ return loc;
+ }
+
+ /** @return the total number of racks */
+ @Override
+ public int getNumOfRacks() {
+ netlock.readLock().lock();
+ try {
+ return numOfRacks;
+ } finally {
+ netlock.readLock().unlock();
+ }
+ }
+
+ /** @return the total number of leaf nodes */
+ public int getNumOfLeaves() {
+ netlock.readLock().lock();
+ try {
+ return clusterMap.getNumOfLeaves();
+ } finally {
+ netlock.readLock().unlock();
+ }
+ }
+
+ /** Return the distance between two nodes
+ * It is assumed that the distance from one node to its parent is 1
+ * The distance between two nodes is calculated by summing up their distances
+ * to their closest common ancestor.
+ * @param node1 one node
+ * @param node2 another node
+ * @return the distance between node1 and node2 which is zero if they are the same
+ * or {@link Integer#MAX_VALUE} if node1 or node2 do not belong to the cluster
+ */
+ public int getDistance(Node node1, Node node2) {
+ if (node1 == node2) {
+ return 0;
+ }
+ Node n1 = node1, n2 = node2;
+ int dis = 0;
+ netlock.readLock().lock();
+ try {
+ int level1 = node1.getLevel(), level2 = node2.getLevel();
+ while (n1 != null && level1 > level2) {
+ n1 = n1.getParent();
+ level1--;
+ dis++;
+ }
+ while (n2 != null && level2 > level1) {
+ n2 = n2.getParent();
+ level2--;
+ dis++;
+ }
+ while (n1 != null && n2 != null && n1.getParent() != n2.getParent()) {
+ n1 = n1.getParent();
+ n2 = n2.getParent();
+ dis += 2;
+ }
+ } finally {
+ netlock.readLock().unlock();
+ }
+ if (n1 == null) {
+ LOG.warn("The cluster does not contain node: {}", NodeBase.getPath(node1));
+ return Integer.MAX_VALUE;
+ }
+ if (n2 == null) {
+ LOG.warn("The cluster does not contain node: {}", NodeBase.getPath(node2));
+ return Integer.MAX_VALUE;
+ }
+ return dis + 2;
+ }
+
+ /** Check if two nodes are on the same rack
+ * @param node1 one node (can be null)
+ * @param node2 another node (can be null)
+ * @return true if node1 and node2 are on the same rack; false otherwise
+ * @exception IllegalArgumentException when either node1 or node2 is null, or
+ * node1 or node2 do not belong to the cluster
+ */
+ public boolean isOnSameRack(Node node1, Node node2) {
+ if (node1 == null || node2 == null) {
+ return false;
+ }
+
+ netlock.readLock().lock();
+ try {
+ return isSameParents(node1, node2);
+ } finally {
+ netlock.readLock().unlock();
+ }
+ }
+
+ /**
+ * Check if network topology is aware of NodeGroup
+ */
+ public boolean isNodeGroupAware() {
+ return false;
+ }
+
+ /**
+ * Return false directly as not aware of NodeGroup, to be override in sub-class
+ */
+ public boolean isOnSameNodeGroup(Node node1, Node node2) {
+ return false;
+ }
+
+ /**
+ * Compare the parents of each node for equality
+ *
+ * <p>To be overridden in subclasses for specific NetworkTopology
+ * implementations, as alternative to overriding the full
+ * {@link #isOnSameRack(Node, Node)} method.
+ *
+ * @param node1 the first node to compare
+ * @param node2 the second node to compare
+ * @return true if their parents are equal, false otherwise
+ *
+ * @see #isOnSameRack(Node, Node)
+ */
+ protected boolean isSameParents(Node node1, Node node2) {
+ return node1.getParent() == node2.getParent();
+ }
+
+ final protected static Random r = new Random();
+
+ /** randomly choose one node from <i>scope</i>
+ * if scope starts with ~, choose one from the all nodes except for the
+ * ones in <i>scope</i>; otherwise, choose one from <i>scope</i>
+ * @param scope range of nodes from which a node will be chosen
+ * @return the chosen node
+ */
+ public Node chooseRandom(String scope) {
+ netlock.readLock().lock();
+ try {
+ if (scope.startsWith("~")) {
+ return chooseRandom(NodeBase.ROOT, scope.substring(1));
+ } else {
+ return chooseRandom(scope, null);
+ }
+ } finally {
+ netlock.readLock().unlock();
+ }
+ }
+
+ private Node chooseRandom(String scope, String excludedScope) {
+ if (excludedScope != null) {
+ if (scope.startsWith(excludedScope)) {
+ return null;
+ }
+ if (!excludedScope.startsWith(scope)) {
+ excludedScope = null;
+ }
+ }
+ Node node = getNode(scope);
+ if (!(node instanceof InnerNode)) {
+ return node;
+ }
+ InnerNode innerNode = (InnerNode) node;
+ int numOfDatanodes = innerNode.getNumOfLeaves();
+ if (excludedScope == null) {
+ node = null;
+ } else {
+ node = getNode(excludedScope);
+ if (!(node instanceof InnerNode)) {
+ numOfDatanodes -= 1;
+ } else {
+ numOfDatanodes -= ((InnerNode) node).getNumOfLeaves();
+ }
+ }
+ int leaveIndex = r.nextInt(numOfDatanodes);
+ return innerNode.getLeaf(leaveIndex, node);
+ }
+
+ /** return leaves in <i>scope</i>
+ * @param scope a path string
+ * @return leaves nodes under specific scope
+ */
+ private Set<Node> doGetLeaves(String scope) {
+ Node node = getNode(scope);
+ Set<Node> leafNodes = new HashSet<Node>();
+ if (!(node instanceof InnerNode)) {
+ leafNodes.add(node);
+ } else {
+ InnerNode innerNode = (InnerNode) node;
+ for (int i = 0; i < innerNode.getNumOfLeaves(); i++) {
+ leafNodes.add(innerNode.getLeaf(i, null));
+ }
+ }
+ return leafNodes;
+ }
+
+ @Override
+ public Set<Node> getLeaves(String scope) {
+ netlock.readLock().lock();
+ try {
+ if (scope.startsWith("~")) {
+ Set<Node> allNodes = doGetLeaves(NodeBase.ROOT);
+ Set<Node> excludeNodes = doGetLeaves(scope.substring(1));
+ allNodes.removeAll(excludeNodes);
+ return allNodes;
+ } else {
+ return doGetLeaves(scope);
+ }
+ } finally {
+ netlock.readLock().unlock();
+ }
+ }
+
+ /** return the number of leaves in <i>scope</i> but not in <i>excludedNodes</i>
+ * if scope starts with ~, return the number of nodes that are not
+ * in <i>scope</i> and <i>excludedNodes</i>;
+ * @param scope a path string that may start with ~
+ * @param excludedNodes a list of nodes
+ * @return number of available nodes
+ */
+ public int countNumOfAvailableNodes(String scope, Collection<Node> excludedNodes) {
+ boolean isExcluded = false;
+ if (scope.startsWith("~")) {
+ isExcluded = true;
+ scope = scope.substring(1);
+ }
+ scope = NodeBase.normalize(scope);
+ int count = 0; // the number of nodes in both scope & excludedNodes
+ netlock.readLock().lock();
+ try {
+ for (Node node : excludedNodes) {
+ if ((NodeBase.getPath(node) + NodeBase.PATH_SEPARATOR_STR).startsWith(scope
+ + NodeBase.PATH_SEPARATOR_STR)) {
+ count++;
+ }
+ }
+ Node n = getNode(scope);
+ int scopeNodeCount = 1;
+ if (n instanceof InnerNode) {
+ scopeNodeCount = ((InnerNode) n).getNumOfLeaves();
+ }
+ if (isExcluded) {
+ return clusterMap.getNumOfLeaves() - scopeNodeCount - excludedNodes.size() + count;
+ } else {
+ return scopeNodeCount - count;
+ }
+ } finally {
+ netlock.readLock().unlock();
+ }
+ }
+
+ /** convert a network tree to a string */
+ @Override
+ public String toString() {
+ // print the number of racks
+ StringBuilder tree = new StringBuilder();
+ tree.append("Number of racks: ");
+ tree.append(numOfRacks);
+ tree.append("\n");
+ // print the number of leaves
+ int numOfLeaves = getNumOfLeaves();
+ tree.append("Expected number of leaves:");
+ tree.append(numOfLeaves);
+ tree.append("\n");
+ // print nodes
+ for (int i = 0; i < numOfLeaves; i++) {
+ tree.append(NodeBase.getPath(clusterMap.getLeaf(i, null)));
+ tree.append("\n");
+ }
+ return tree.toString();
+ }
+
+ /**
+ * Divide networklocation string into two parts by last separator, and get
+ * the first part here.
+ *
+ * @param networkLocation
+ * @return
+ */
+ public static String getFirstHalf(String networkLocation) {
+ int index = networkLocation.lastIndexOf(NodeBase.PATH_SEPARATOR_STR);
+ return networkLocation.substring(0, index);
+ }
+
+ /**
+ * Divide networklocation string into two parts by last separator, and get
+ * the second part here.
+ *
+ * @param networkLocation
+ * @return
+ */
+ public static String getLastHalf(String networkLocation) {
+ int index = networkLocation.lastIndexOf(NodeBase.PATH_SEPARATOR_STR);
+ return networkLocation.substring(index);
+ }
+
+ /** swap two array items */
+ static protected void swap(Node[] nodes, int i, int j) {
+ Node tempNode;
+ tempNode = nodes[j];
+ nodes[j] = nodes[i];
+ nodes[i] = tempNode;
+ }
+
+ /** Sort nodes array by their distances to <i>reader</i>
+ * It linearly scans the array, if a local node is found, swap it with
+ * the first element of the array.
+ * If a local rack node is found, swap it with the first element following
+ * the local node.
+ * If neither local node or local rack node is found, put a random replica
+ * location at position 0.
+ * It leaves the rest nodes untouched.
+ * @param reader the node that wishes to read a block from one of the nodes
+ * @param nodes the list of nodes containing data for the reader
+ */
+ public void pseudoSortByDistance(Node reader, Node[] nodes) {
+ int tempIndex = 0;
+ int localRackNode = -1;
+ if (reader != null) {
+ //scan the array to find the local node & local rack node
+ for (int i = 0; i < nodes.length; i++) {
+ if (tempIndex == 0 && reader == nodes[i]) { //local node
+ //swap the local node and the node at position 0
+ if (i != 0) {
+ swap(nodes, tempIndex, i);
+ }
+ tempIndex = 1;
+ if (localRackNode != -1) {
+ if (localRackNode == 0) {
+ localRackNode = i;
+ }
+ break;
+ }
+ } else if (localRackNode == -1 && isOnSameRack(reader, nodes[i])) {
+ //local rack
+ localRackNode = i;
+ if (tempIndex != 0)
+ break;
+ }
+ }
+
+ // swap the local rack node and the node at position tempIndex
+ if (localRackNode != -1 && localRackNode != tempIndex) {
+ swap(nodes, tempIndex, localRackNode);
+ tempIndex++;
+ }
+ }
+
+ // put a random node at position 0 if it is not a local/local-rack node
+ if (tempIndex == 0 && localRackNode == -1 && nodes.length != 0) {
+ swap(nodes, 0, r.nextInt(nodes.length));
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/bbd1eb8d/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/Node.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/Node.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/Node.java
index f11e0a7..62ec6c0 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/Node.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/Node.java
@@ -15,7 +15,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-// This code has been copied from hadoop-common 0.23.1
package org.apache.bookkeeper.net;
import com.google.common.annotations.Beta;
@@ -31,6 +30,9 @@ import com.google.common.annotations.Beta;
*/
@Beta
public interface Node {
+ /** @return the string representation of this node's network location at the specified level in the hierarchy*/
+ public String getNetworkLocation(int level);
+
/** @return the string representation of this node's network location */
public String getNetworkLocation();
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/bbd1eb8d/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/NodeBase.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/NodeBase.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/NodeBase.java
index f1a4b85..13ddfce 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/NodeBase.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/NodeBase.java
@@ -15,7 +15,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-// This code has been copied from hadoop-common 0.23.1
package org.apache.bookkeeper.net;
/** A base class that implements interface Node
@@ -183,4 +182,18 @@ public class NodeBase implements Node {
}
return depth;
}
+
+ @Override
+ public String getNetworkLocation(int distanceFromLeaves) {
+ Node node = this;
+ while (distanceFromLeaves > 1) {
+ Node parent = node.getParent();
+ if (null == parent) {
+ break;
+ }
+ node = parent;
+ distanceFromLeaves--;
+ }
+ return node.getNetworkLocation();
+ }
}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/bbd1eb8d/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/ScriptBasedMapping.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/ScriptBasedMapping.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/ScriptBasedMapping.java
index 1671cc8..dd8563c 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/ScriptBasedMapping.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/ScriptBasedMapping.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-// This code has been copied from hadoop-common 0.23.1
+
package org.apache.bookkeeper.net;
import java.io.File;
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/bbd1eb8d/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/StabilizeNetworkTopology.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/StabilizeNetworkTopology.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/StabilizeNetworkTopology.java
new file mode 100644
index 0000000..5dce906
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/StabilizeNetworkTopology.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.net;
+
+import org.apache.bookkeeper.util.MathUtils;
+import org.jboss.netty.util.HashedWheelTimer;
+import org.jboss.netty.util.Timeout;
+import org.jboss.netty.util.TimerTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This is going to provide a stabilize network topology regarding to flapping zookeeper registration.
+ */
+public class StabilizeNetworkTopology implements NetworkTopology {
+
+ private static final Logger logger = LoggerFactory.getLogger(StabilizeNetworkTopology.class);
+
+ static class NodeStatus {
+ long lastPresentTime;
+ boolean tentativeToRemove;
+
+ NodeStatus() {
+ this.lastPresentTime = MathUtils.now();
+ }
+
+ synchronized boolean isTentativeToRemove() {
+ return tentativeToRemove;
+ }
+
+ synchronized NodeStatus updateStatus(boolean tentativeToRemove) {
+ this.tentativeToRemove = tentativeToRemove;
+ if (!this.tentativeToRemove) {
+ this.lastPresentTime = MathUtils.now();
+ }
+ return this;
+ }
+
+ synchronized long getLastPresentTime() {
+ return this.lastPresentTime;
+ }
+ }
+
+ protected final NetworkTopologyImpl impl;
+ // timer
+ protected final HashedWheelTimer timer;
+ // statuses
+ protected final ConcurrentMap<Node, NodeStatus> nodeStatuses;
+ // stabilize period seconds
+ protected final long stabilizePeriodMillis;
+
+ private class RemoveNodeTask implements TimerTask {
+
+ private final Node node;
+
+ RemoveNodeTask(Node node) {
+ this.node = node;
+ }
+
+ @Override
+ public void run(Timeout timeout) throws Exception {
+ if (timeout.isCancelled()) {
+ return;
+ }
+ NodeStatus status = nodeStatuses.get(node);
+ if (null == status) {
+ // no status of this node, remove this node from topology
+ impl.remove(node);
+ } else if (status.isTentativeToRemove()) {
+ long millisSinceLastSeen = MathUtils.now() - status.getLastPresentTime();
+ if (millisSinceLastSeen >= stabilizePeriodMillis) {
+ logger.info("Node {} (seen @ {}) becomes stale for {} ms, remove it from the topology.",
+ new Object[] { node, status.getLastPresentTime(), millisSinceLastSeen });
+ impl.remove(node);
+ nodeStatuses.remove(node, status);
+ }
+ }
+ }
+ }
+
+ public StabilizeNetworkTopology(HashedWheelTimer timer,
+ int stabilizePeriodSeconds) {
+ this.impl = new NetworkTopologyImpl();
+ this.timer = timer;
+ this.nodeStatuses = new ConcurrentHashMap<Node, NodeStatus>();
+ this.stabilizePeriodMillis = TimeUnit.SECONDS.toMillis(stabilizePeriodSeconds);
+ }
+
+ void updateNode(Node node, boolean tentativeToRemove) {
+ NodeStatus ns = nodeStatuses.get(node);
+ if (null == ns) {
+ NodeStatus newStatus = new NodeStatus();
+ NodeStatus oldStatus = nodeStatuses.putIfAbsent(node, newStatus);
+ if (null == oldStatus) {
+ ns = newStatus;
+ } else {
+ ns = oldStatus;
+ }
+ }
+ ns.updateStatus(tentativeToRemove);
+ }
+
+ @Override
+ public void add(Node node) {
+ updateNode(node, false);
+ this.impl.add(node);
+ }
+
+ @Override
+ public void remove(Node node) {
+ updateNode(node, true);
+ timer.newTimeout(new RemoveNodeTask(node), stabilizePeriodMillis, TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public boolean contains(Node node) {
+ return impl.contains(node);
+ }
+
+ @Override
+ public Node getNode(String loc) {
+ return impl.getNode(loc);
+ }
+
+ @Override
+ public int getNumOfRacks() {
+ return impl.getNumOfRacks();
+ }
+
+ @Override
+ public Set<Node> getLeaves(String loc) {
+ return impl.getLeaves(loc);
+ }
+}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/bbd1eb8d/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
index 21b16fb..274fd36 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
@@ -180,7 +180,7 @@ public class PerChannelBookieClient extends SimpleChannelHandler implements Chan
this.extRegistry = extRegistry;
StringBuilder nameBuilder = new StringBuilder();
- nameBuilder.append(addr.getHostname().replace('.', '_').replace('-', '_'))
+ nameBuilder.append(addr.getHostName().replace('.', '_').replace('-', '_'))
.append("_").append(addr.getPort());
this.statsLogger = parentStatsLogger.scope(BookKeeperClientStats.CHANNEL_SCOPE)
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/bbd1eb8d/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/BookKeeperConstants.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/BookKeeperConstants.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/BookKeeperConstants.java
index 987de7a..8dc7e2d 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/BookKeeperConstants.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/BookKeeperConstants.java
@@ -50,4 +50,6 @@ public class BookKeeperConstants {
* memtable (for performance consideration)
*/
public static final long MAX_LOG_SIZE_LIMIT = 1 * 1024 * 1024 * 1024;
+
+ public static final String FEATURE_REPP_DISABLE_DURABILITY_ENFORCEMENT = "repp_disable_durability_enforcement";
}