You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ar...@apache.org on 2017/08/21 21:08:26 UTC

hadoop git commit: HDFS-11482. Add storage type demand to into DFSNetworkTopology#chooseRandom. Contributed by Chen Liang.

Repository: hadoop
Updated Branches:
  refs/heads/branch-2 89fc7fe67 -> b3ea11dfd


HDFS-11482. Add storage type demand to into DFSNetworkTopology#chooseRandom. Contributed by Chen Liang.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b3ea11df
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b3ea11df
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b3ea11df

Branch: refs/heads/branch-2
Commit: b3ea11dfdb46fcec86118a132bee9a9978df21dd
Parents: 89fc7fe
Author: Arpit Agarwal <ar...@apache.org>
Authored: Mon Aug 21 14:07:59 2017 -0700
Committer: Arpit Agarwal <ar...@apache.org>
Committed: Mon Aug 21 14:07:59 2017 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/net/InnerNodeImpl.java    |   8 +-
 .../net/NetworkTopologyWithNodeGroup.java       |   2 +-
 .../hadoop/hdfs/net/DFSNetworkTopology.java     | 289 ++++++++++++
 .../hadoop/hdfs/net/DFSTopologyNodeImpl.java    | 277 ++++++++++++
 .../blockmanagement/DatanodeDescriptor.java     |  10 +
 .../apache/hadoop/hdfs/DFSNetworkTopology.java  |  36 --
 .../apache/hadoop/hdfs/DFSTopologyNodeImpl.java | 255 -----------
 .../hadoop/hdfs/TestDFSNetworkTopology.java     | 260 -----------
 .../hadoop/hdfs/net/TestDFSNetworkTopology.java | 449 +++++++++++++++++++
 9 files changed, 1030 insertions(+), 556 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3ea11df/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/InnerNodeImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/InnerNodeImpl.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/InnerNodeImpl.java
index 81eaf7f..5a2931b 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/InnerNodeImpl.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/InnerNodeImpl.java
@@ -63,7 +63,7 @@ public class InnerNodeImpl extends NodeBase implements InnerNode {
   /** Judge if this node represents a rack
    * @return true if it has no child or its children are not InnerNodes
    */
-  boolean isRack() {
+  public boolean isRack() {
     if (children.isEmpty()) {
       return true;
     }
@@ -81,7 +81,7 @@ public class InnerNodeImpl extends NodeBase implements InnerNode {
    * @param n a node
    * @return true if this node is an ancestor of <i>n</i>
    */
-  protected boolean isAncestor(Node n) {
+  public boolean isAncestor(Node n) {
     return getPath(this).equals(NodeBase.PATH_SEPARATOR_STR) ||
       (n.getNetworkLocation()+NodeBase.PATH_SEPARATOR_STR).
       startsWith(getPath(this)+NodeBase.PATH_SEPARATOR_STR);
@@ -92,12 +92,12 @@ public class InnerNodeImpl extends NodeBase implements InnerNode {
    * @param n a node
    * @return true if this node is the parent of <i>n</i>
    */
-  protected boolean isParent(Node n) {
+  public boolean isParent(Node n) {
     return n.getNetworkLocation().equals(getPath(this));
   }
 
   /* Return a child name of this node who is an ancestor of node <i>n</i> */
-  protected String getNextAncestorName(Node n) {
+  public String getNextAncestorName(Node n) {
     if (!isAncestor(n)) {
       throw new IllegalArgumentException(
                                          this + "is not an ancestor of " + n);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3ea11df/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopologyWithNodeGroup.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopologyWithNodeGroup.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopologyWithNodeGroup.java
index a20d5fc..bec0fe1 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopologyWithNodeGroup.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopologyWithNodeGroup.java
@@ -308,7 +308,7 @@ public class NetworkTopologyWithNodeGroup extends NetworkTopology {
     }
 
     @Override
-    boolean isRack() {
+    public boolean isRack() {
       // it is node group
       if (getChildren().isEmpty()) {
         return false;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3ea11df/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/DFSNetworkTopology.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/DFSNetworkTopology.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/DFSNetworkTopology.java
new file mode 100644
index 0000000..ee83dba
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/DFSNetworkTopology.java
@@ -0,0 +1,289 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.net;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
+import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.net.Node;
+import org.apache.hadoop.net.NodeBase;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Random;
+
+/**
+ * The HDFS specific network topology class. The main purpose of doing this
+ * subclassing is to add storage-type-aware chooseRandom method. All the
+ * remaining parts should be the same.
+ *
+ * Currently a placeholder to test storage type info.
+ * TODO : add "chooseRandom with storageType info" function.
+ */
+public class DFSNetworkTopology extends NetworkTopology {
+
+  private static final Random RANDOM = new Random();
+
+  public static DFSNetworkTopology getInstance(Configuration conf) {
+    DFSNetworkTopology nt = new DFSNetworkTopology();
+    return (DFSNetworkTopology)nt.init(DFSTopologyNodeImpl.FACTORY);
+  }
+
+  /**
+   * Randomly choose one node from <i>scope</i>, with specified storage type.
+   *
+   * If scope starts with ~, choose one from the all nodes except for the
+   * ones in <i>scope</i>; otherwise, choose one from <i>scope</i>.
+   * If excludedNodes is given, choose a node that's not in excludedNodes.
+   *
+   * @param scope range of nodes from which a node will be chosen
+   * @param excludedNodes nodes to be excluded from
+   * @return the chosen node
+   */
+  public Node chooseRandomWithStorageType(final String scope,
+      final Collection<Node> excludedNodes, StorageType type) {
+    netlock.readLock().lock();
+    try {
+      if (scope.startsWith("~")) {
+        return chooseRandomWithStorageType(
+            NodeBase.ROOT, scope.substring(1), excludedNodes, type);
+      } else {
+        return chooseRandomWithStorageType(
+            scope, null, excludedNodes, type);
+      }
+    } finally {
+      netlock.readLock().unlock();
+    }
+  }
+
+  /**
+   * Choose a random node based on given scope, excludedScope and excludedNodes
+   * set. Although in general the topology has at most three layers, this class
+   * will not impose such assumption.
+   *
+   * At high level, the idea is like this, say:
+   *
+   * R has two children A and B, and storage type is X, say:
+   * A has X = 6 (rooted at A there are 6 datanodes with X) and B has X = 8.
+   *
+   * Then R will generate a random int between 1~14, if it's <= 6, recursively
+   * call into A, otherwise B. This will maintain a uniformed randomness of
+   * choosing datanodes.
+   *
+   * The tricky part is how to handle excludes.
+   *
+   * For excludedNodes, since this set is small: currently the main reason of
+   * being an excluded node is because it already has a replica. So randomly
+   * picking up this node again should be rare. Thus we only check that, if the
+   * chosen node is excluded, we do chooseRandom again.
+   *
+   * For excludedScope, we locate the root of the excluded scope. Subtracting
+   * all it's ancestors' storage counters accordingly, this way the excluded
+   * root is out of the picture.
+   *
+   * TODO : this function has duplicate code as NetworkTopology, need to
+   * refactor in the future.
+   *
+   * @param scope
+   * @param excludedScope
+   * @param excludedNodes
+   * @return
+   */
+  @VisibleForTesting
+  Node chooseRandomWithStorageType(final String scope,
+      String excludedScope, final Collection<Node> excludedNodes,
+      StorageType type) {
+    if (excludedScope != null) {
+      if (scope.startsWith(excludedScope)) {
+        return null;
+      }
+      if (!excludedScope.startsWith(scope)) {
+        excludedScope = null;
+      }
+    }
+    Node node = getNode(scope);
+    if (node == null) {
+      LOG.debug("Invalid scope {}, non-existing node", scope);
+      return null;
+    }
+    if (!(node instanceof DFSTopologyNodeImpl)) {
+      // a node is either DFSTopologyNodeImpl, or a DatanodeDescriptor
+      return ((DatanodeDescriptor)node).hasStorageType(type) ? node : null;
+    }
+    DFSTopologyNodeImpl root = (DFSTopologyNodeImpl)node;
+    Node excludeRoot = excludedScope == null ? null : getNode(excludedScope);
+
+    // check to see if there are nodes satisfying the condition at all
+    int availableCount = root.getSubtreeStorageCount(type);
+    if (excludeRoot != null && root.isAncestor(excludeRoot)) {
+      if (excludeRoot instanceof DFSTopologyNodeImpl) {
+        availableCount -= ((DFSTopologyNodeImpl)excludeRoot)
+            .getSubtreeStorageCount(type);
+      } else {
+        availableCount -= ((DatanodeDescriptor)excludeRoot)
+            .hasStorageType(type) ? 1 : 0;
+      }
+    }
+    if (excludedNodes != null) {
+      for (Node excludedNode : excludedNodes) {
+        // all excluded nodes should be DatanodeDescriptor
+        Preconditions.checkArgument(excludedNode instanceof DatanodeDescriptor);
+        availableCount -= ((DatanodeDescriptor) excludedNode)
+            .hasStorageType(type) ? 1 : 0;
+      }
+    }
+    if (availableCount <= 0) {
+      // should never be <0 in general, adding <0 check for safety purpose
+      return null;
+    }
+    // to this point, it is guaranteed that there is at least one node
+    // that satisfies the requirement, keep trying until we found one.
+    Node chosen;
+    do {
+      chosen = chooseRandomWithStorageTypeAndExcludeRoot(root, excludeRoot,
+          type);
+      if (excludedNodes == null || !excludedNodes.contains(chosen)) {
+        break;
+      } else {
+        LOG.debug("Node {} is excluded, continuing.", chosen);
+      }
+    } while (true);
+    LOG.debug("chooseRandom returning {}", chosen);
+    return chosen;
+  }
+
+  /**
+   * Choose a random node that has the required storage type, under the given
+   * root, with an excluded subtree root (could also just be a leaf node).
+   *
+   * Note that excludedNode is checked after a random node, so it is not being
+   * handled here.
+   *
+   * @param root the root node where we start searching for a datanode
+   * @param excludeRoot the root of the subtree what should be excluded
+   * @param type the expected storage type
+   * @return a random datanode, with the storage type, and is not in excluded
+   * scope
+   */
+  private Node chooseRandomWithStorageTypeAndExcludeRoot(
+      DFSTopologyNodeImpl root, Node excludeRoot, StorageType type) {
+    Node chosenNode;
+    if (root.isRack()) {
+      // children are datanode descriptor
+      ArrayList<Node> candidates = new ArrayList<>();
+      for (Node node : root.getChildren()) {
+        if (node.equals(excludeRoot)) {
+          continue;
+        }
+        DatanodeDescriptor dnDescriptor = (DatanodeDescriptor)node;
+        if (dnDescriptor.hasStorageType(type)) {
+          candidates.add(node);
+        }
+      }
+      if (candidates.size() == 0) {
+        return null;
+      }
+      // to this point, all nodes in candidates are valid choices, and they are
+      // all datanodes, pick a random one.
+      chosenNode = candidates.get(RANDOM.nextInt(candidates.size()));
+    } else {
+      // the children are inner nodes
+      ArrayList<DFSTopologyNodeImpl> candidates =
+          getEligibleChildren(root, excludeRoot, type);
+      if (candidates.size() == 0) {
+        return null;
+      }
+      // again, all children are also inner nodes, we can do this cast.
+      // to maintain uniformality, the search needs to be based on the counts
+      // of valid datanodes. Below is a random weighted choose.
+      int totalCounts = 0;
+      int[] countArray = new int[candidates.size()];
+      for (int i = 0; i < candidates.size(); i++) {
+        DFSTopologyNodeImpl innerNode = candidates.get(i);
+        int subTreeCount = innerNode.getSubtreeStorageCount(type);
+        totalCounts += subTreeCount;
+        countArray[i] = subTreeCount;
+      }
+      // generate a random val between [1, totalCounts]
+      int randomCounts = RANDOM.nextInt(totalCounts) + 1;
+      int idxChosen = 0;
+      // searching for the idxChosen can potentially be done with binary
+      // search, but does not seem to worth it here.
+      for (int i = 0; i < countArray.length; i++) {
+        if (randomCounts <= countArray[i]) {
+          idxChosen = i;
+          break;
+        }
+        randomCounts -= countArray[i];
+      }
+      DFSTopologyNodeImpl nextRoot = candidates.get(idxChosen);
+      chosenNode = chooseRandomWithStorageTypeAndExcludeRoot(
+          nextRoot, excludeRoot, type);
+    }
+    return chosenNode;
+  }
+
+  /**
+   * Given root, excluded root and storage type. Find all the children of the
+   * root, that has the storage type available. One check is that if the
+   * excluded root is under a children, this children must subtract the storage
+   * count of the excluded root.
+   * @param root the subtree root we check.
+   * @param excludeRoot the root of the subtree that should be excluded.
+   * @param type the storage type we look for.
+   * @return a list of possible nodes, each of them is eligible as the next
+   * level root we search.
+   */
+  private ArrayList<DFSTopologyNodeImpl> getEligibleChildren(
+      DFSTopologyNodeImpl root, Node excludeRoot, StorageType type) {
+    ArrayList<DFSTopologyNodeImpl> candidates = new ArrayList<>();
+    int excludeCount = 0;
+    if (excludeRoot != null && root.isAncestor(excludeRoot)) {
+      // the subtree to be excluded is under the given root,
+      // find out the number of nodes to be excluded.
+      if (excludeRoot instanceof DFSTopologyNodeImpl) {
+        // if excludedRoot is an inner node, get the counts of all nodes on
+        // this subtree of that storage type.
+        excludeCount = ((DFSTopologyNodeImpl) excludeRoot)
+            .getSubtreeStorageCount(type);
+      } else {
+        // if excludedRoot is a datanode, simply ignore this one node
+        if (((DatanodeDescriptor) excludeRoot).hasStorageType(type)) {
+          excludeCount = 1;
+        }
+      }
+    }
+    // have calculated the number of storage counts to be excluded.
+    // walk through all children to check eligibility.
+    for (Node node : root.getChildren()) {
+      DFSTopologyNodeImpl dfsNode = (DFSTopologyNodeImpl) node;
+      int storageCount = dfsNode.getSubtreeStorageCount(type);
+      if (excludeRoot != null && excludeCount != 0 &&
+          (dfsNode.isAncestor(excludeRoot) || dfsNode.equals(excludeRoot))) {
+        storageCount -= excludeCount;
+      }
+      if (storageCount > 0) {
+        candidates.add(dfsNode);
+      }
+    }
+    return candidates;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3ea11df/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/DFSTopologyNodeImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/DFSTopologyNodeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/DFSTopologyNodeImpl.java
new file mode 100644
index 0000000..c00978d
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/DFSTopologyNodeImpl.java
@@ -0,0 +1,277 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.net;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
+import org.apache.hadoop.net.InnerNode;
+import org.apache.hadoop.net.InnerNodeImpl;
+import org.apache.hadoop.net.Node;
+
+import java.util.EnumMap;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * The HDFS-specific representation of a network topology inner node. The
+ * difference is this class includes the information about the storage type
+ * info of this subtree. This info will be used when selecting subtrees
+ * in block placement.
+ */
+public class DFSTopologyNodeImpl extends InnerNodeImpl {
+
+  static final InnerNodeImpl.Factory FACTORY
+      = new DFSTopologyNodeImpl.Factory();
+
+  static final class Factory extends InnerNodeImpl.Factory {
+    private Factory() {}
+
+    @Override
+    public InnerNodeImpl newInnerNode(String path) {
+      return new DFSTopologyNodeImpl(path);
+    }
+  }
+
+  /**
+   * The core data structure of this class. The information about what storage
+   * types this subtree has. Basically, a map whose key is a child
+   * id, value is a enum map including the counts of each storage type. e.g.
+   * DISK type has count 5 means there are 5 leaf datanodes with DISK type
+   * available. This value is set/updated upon datanode joining and leaving.
+   *
+   * NOTE : It might be sufficient to keep only a map from storage type
+   * to count, omitting the child node id. But this might make it hard to keep
+   * consistency when there are updates from children.
+   *
+   * For example, if currently R has two children A and B with storage X, Y, and
+   * A : X=1 Y=1
+   * B : X=2 Y=2
+   * so we store X=3 Y=3 as total on R.
+   *
+   * Now say A has a new X plugged in and becomes X=2 Y=1.
+   *
+   * If we know that "A adds one X", it is easy to update R by +1 on X. However,
+   * if we don't know "A adds one X", but instead got "A now has X=2 Y=1",
+   * (which seems to be the case in current heartbeat) we will not know how to
+   * update R. While if we store on R "A has X=1 and Y=1" then we can simply
+   * update R by completely replacing the A entry and all will be good.
+   */
+  private final HashMap
+      <String, EnumMap<StorageType, Integer>> childrenStorageInfo;
+
+  DFSTopologyNodeImpl(String path) {
+    super(path);
+    childrenStorageInfo = new HashMap<>();
+  }
+
+  DFSTopologyNodeImpl(
+      String name, String location, InnerNode parent, int level) {
+    super(name, location, parent, level);
+    childrenStorageInfo = new HashMap<>();
+  }
+
+  public int getSubtreeStorageCount(StorageType type) {
+    int res = 0;
+    for (Map.Entry<String, EnumMap<StorageType, Integer>> entry :
+        childrenStorageInfo.entrySet()) {
+      if (entry.getValue().containsKey(type)) {
+        res += entry.getValue().get(type);
+      }
+    }
+    return res;
+  }
+
+  int getNumOfChildren() {
+    return children.size();
+  }
+
+  @Override
+  public boolean add(Node n) {
+    if (!isAncestor(n)) {
+      throw new IllegalArgumentException(n.getName()
+          + ", which is located at " + n.getNetworkLocation()
+          + ", is not a descendant of " + getPath(this));
+    }
+    // In HDFS topology, the leaf node should always be DatanodeDescriptor
+    if (!(n instanceof DatanodeDescriptor)) {
+      throw new IllegalArgumentException("Unexpected node type "
+          + n.getClass().getName());
+    }
+    DatanodeDescriptor dnDescriptor = (DatanodeDescriptor) n;
+    if (isParent(n)) {
+      // this node is the parent of n; add n directly
+      n.setParent(this);
+      n.setLevel(this.level + 1);
+      Node prev = childrenMap.put(n.getName(), n);
+      if (prev != null) {
+        for(int i=0; i<children.size(); i++) {
+          if (children.get(i).getName().equals(n.getName())) {
+            children.set(i, n);
+            return false;
+          }
+        }
+      }
+      children.add(n);
+      numOfLeaves++;
+      synchronized (childrenStorageInfo) {
+        if (!childrenStorageInfo.containsKey(dnDescriptor.getName())) {
+          childrenStorageInfo.put(
+              dnDescriptor.getName(),
+              new EnumMap<StorageType, Integer>(StorageType.class));
+        }
+        for (StorageType st : dnDescriptor.getStorageTypes()) {
+          childrenStorageInfo.get(dnDescriptor.getName()).put(st, 1);
+        }
+      }
+      return true;
+    } else {
+      // find the next ancestor node
+      String parentName = getNextAncestorName(n);
+      InnerNode parentNode = (InnerNode)childrenMap.get(parentName);
+      if (parentNode == null) {
+        // create a new InnerNode
+        parentNode = createParentNode(parentName);
+        children.add(parentNode);
+        childrenMap.put(parentNode.getName(), parentNode);
+      }
+      // add n to the subtree of the next ancestor node
+      if (parentNode.add(n)) {
+        numOfLeaves++;
+        synchronized (childrenStorageInfo) {
+          if (!childrenStorageInfo.containsKey(parentNode.getName())) {
+            childrenStorageInfo.put(
+                parentNode.getName(),
+                new EnumMap<StorageType, Integer>(StorageType.class));
+            for (StorageType st : dnDescriptor.getStorageTypes()) {
+              childrenStorageInfo.get(parentNode.getName()).put(st, 1);
+            }
+          } else {
+            EnumMap<StorageType, Integer> currentCount =
+                childrenStorageInfo.get(parentNode.getName());
+            for (StorageType st : dnDescriptor.getStorageTypes()) {
+              if (currentCount.containsKey(st)) {
+                currentCount.put(st, currentCount.get(st) + 1);
+              } else {
+                currentCount.put(st, 1);
+              }
+            }
+          }
+        }
+        return true;
+      } else {
+        return false;
+      }
+    }
+  }
+
+  @VisibleForTesting
+  HashMap <String, EnumMap<StorageType, Integer>> getChildrenStorageInfo() {
+    return childrenStorageInfo;
+  }
+
+
+  private DFSTopologyNodeImpl createParentNode(String parentName) {
+    return new DFSTopologyNodeImpl(
+        parentName, getPath(this), this, this.getLevel() + 1);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    return super.equals(o);
+  }
+
+  @Override
+  public int hashCode() {
+    return super.hashCode();
+  }
+
+  @Override
+  public boolean remove(Node n) {
+    if (!isAncestor(n)) {
+      throw new IllegalArgumentException(n.getName()
+          + ", which is located at " + n.getNetworkLocation()
+          + ", is not a descendant of " + getPath(this));
+    }
+    // In HDFS topology, the leaf node should always be DatanodeDescriptor
+    if (!(n instanceof DatanodeDescriptor)) {
+      throw new IllegalArgumentException("Unexpected node type "
+          + n.getClass().getName());
+    }
+    DatanodeDescriptor dnDescriptor = (DatanodeDescriptor) n;
+    if (isParent(n)) {
+      // this node is the parent of n; remove n directly
+      if (childrenMap.containsKey(n.getName())) {
+        for (int i=0; i<children.size(); i++) {
+          if (children.get(i).getName().equals(n.getName())) {
+            children.remove(i);
+            childrenMap.remove(n.getName());
+            synchronized (childrenStorageInfo) {
+              childrenStorageInfo.remove(dnDescriptor.getName());
+            }
+            numOfLeaves--;
+            n.setParent(null);
+            return true;
+          }
+        }
+      }
+      return false;
+    } else {
+      // find the next ancestor node: the parent node
+      String parentName = getNextAncestorName(n);
+      DFSTopologyNodeImpl parentNode =
+          (DFSTopologyNodeImpl)childrenMap.get(parentName);
+      if (parentNode == null) {
+        return false;
+      }
+      // remove n from the parent node
+      boolean isRemoved = parentNode.remove(n);
+      if (isRemoved) {
+        // if the parent node has no children, remove the parent node too
+        synchronized (childrenStorageInfo) {
+          EnumMap<StorageType, Integer> currentCount =
+              childrenStorageInfo.get(parentNode.getName());
+          EnumSet<StorageType> toRemove = EnumSet.noneOf(StorageType.class);
+          for (StorageType st : dnDescriptor.getStorageTypes()) {
+            int newCount = currentCount.get(st) - 1;
+            if (newCount == 0) {
+              toRemove.add(st);
+            }
+            currentCount.put(st, newCount);
+          }
+          for (StorageType st : toRemove) {
+            currentCount.remove(st);
+          }
+        }
+        if (parentNode.getNumOfChildren() == 0) {
+          for(int i=0; i < children.size(); i++) {
+            if (children.get(i).getName().equals(parentName)) {
+              children.remove(i);
+              childrenMap.remove(parentName);
+              childrenStorageInfo.remove(parentNode.getName());
+              break;
+            }
+          }
+        }
+        numOfLeaves--;
+      }
+      return isRemoved;
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3ea11df/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
index 3cf3ea0..77ab6b6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
@@ -923,5 +923,15 @@ public class DatanodeDescriptor extends DatanodeInfo {
   public boolean isRegistered() {
     return isAlive() && !forceRegistration;
   }
+
+
+  public boolean hasStorageType(StorageType type) {
+    for (DatanodeStorageInfo dnStorage : getStorageInfos()) {
+      if (dnStorage.getStorageType() == type) {
+        return true;
+      }
+    }
+    return false;
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3ea11df/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSNetworkTopology.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSNetworkTopology.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSNetworkTopology.java
deleted file mode 100644
index a6b8c00..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSNetworkTopology.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.net.NetworkTopology;
-
-/**
- * The HDFS specific network topology class. The main purpose of doing this
- * subclassing is to add storage-type-aware chooseRandom method. All the
- * remaining parts should be the same.
- *
- * Currently a placeholder to test storage type info.
- * TODO : add "chooseRandom with storageType info" function.
- */
-public class DFSNetworkTopology extends NetworkTopology {
-  public static DFSNetworkTopology getInstance(Configuration conf) {
-    DFSNetworkTopology nt = new DFSNetworkTopology();
-    return (DFSNetworkTopology)nt.init(DFSTopologyNodeImpl.FACTORY);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3ea11df/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTopologyNodeImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTopologyNodeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTopologyNodeImpl.java
deleted file mode 100644
index aee9fa3..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTopologyNodeImpl.java
+++ /dev/null
@@ -1,255 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.hadoop.fs.StorageType;
-import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
-import org.apache.hadoop.net.InnerNode;
-import org.apache.hadoop.net.InnerNodeImpl;
-import org.apache.hadoop.net.Node;
-
-import java.util.EnumMap;
-import java.util.EnumSet;
-import java.util.HashMap;
-
-/**
- * The HDFS-specific representation of a network topology inner node. The
- * difference is this class includes the information about the storage type
- * info of this subtree. This info will be used when selecting subtrees
- * in block placement.
- */
-public class DFSTopologyNodeImpl extends InnerNodeImpl {
-
-  static final InnerNodeImpl.Factory FACTORY
-      = new DFSTopologyNodeImpl.Factory();
-
-  static final class Factory extends InnerNodeImpl.Factory {
-    private Factory() {}
-
-    @Override
-    public InnerNodeImpl newInnerNode(String path) {
-      return new DFSTopologyNodeImpl(path);
-    }
-  }
-
-  /**
-   * The core data structure of this class. The information about what storage
-   * types this subtree has. Basically, a map whose key is a child
-   * id, value is a enum map including the counts of each storage type. e.g.
-   * DISK type has count 5 means there are 5 leaf datanodes with DISK type
-   * available. This value is set/updated upon datanode joining and leaving.
-   *
-   * NOTE : It might be sufficient to keep only a map from storage type
-   * to count, omitting the child node id. But this might make it hard to keep
-   * consistency when there are updates from children.
-   *
-   * For example, if currently R has two children A and B with storage X, Y, and
-   * A : X=1 Y=1
-   * B : X=2 Y=2
-   * so we store X=3 Y=3 as total on R.
-   *
-   * Now say A has a new X plugged in and becomes X=2 Y=1.
-   *
-   * If we know that "A adds one X", it is easy to update R by +1 on X. However,
-   * if we don't know "A adds one X", but instead got "A now has X=2 Y=1",
-   * (which seems to be the case in current heartbeat) we will not know how to
-   * update R. While if we store on R "A has X=1 and Y=1" then we can simply
-   * update R by completely replacing the A entry and all will be good.
-   */
-  private final HashMap
-      <String, EnumMap<StorageType, Integer>> childrenStorageInfo;
-
-  DFSTopologyNodeImpl(String path) {
-    super(path);
-    childrenStorageInfo = new HashMap<>();
-  }
-
-  DFSTopologyNodeImpl(
-      String name, String location, InnerNode parent, int level) {
-    super(name, location, parent, level);
-    childrenStorageInfo = new HashMap<>();
-  }
-
-  int getNumOfChildren() {
-    return children.size();
-  }
-
-  @Override
-  public boolean add(Node n) {
-    if (!isAncestor(n)) {
-      throw new IllegalArgumentException(n.getName()
-          + ", which is located at " + n.getNetworkLocation()
-          + ", is not a descendant of " + getPath(this));
-    }
-    // In HDFS topology, the leaf node should always be DatanodeDescriptor
-    if (!(n instanceof DatanodeDescriptor)) {
-      throw new IllegalArgumentException("Unexpected node type "
-          + n.getClass().getName());
-    }
-    DatanodeDescriptor dnDescriptor = (DatanodeDescriptor) n;
-    if (isParent(n)) {
-      // this node is the parent of n; add n directly
-      n.setParent(this);
-      n.setLevel(this.level + 1);
-      Node prev = childrenMap.put(n.getName(), n);
-      if (prev != null) {
-        for(int i=0; i<children.size(); i++) {
-          if (children.get(i).getName().equals(n.getName())) {
-            children.set(i, n);
-            return false;
-          }
-        }
-      }
-      children.add(n);
-      numOfLeaves++;
-      synchronized (childrenStorageInfo) {
-        if (!childrenStorageInfo.containsKey(dnDescriptor.getName())) {
-          childrenStorageInfo.put(
-              dnDescriptor.getName(),
-              new EnumMap<StorageType, Integer>(StorageType.class));
-        }
-        for (StorageType st : dnDescriptor.getStorageTypes()) {
-          childrenStorageInfo.get(dnDescriptor.getName()).put(st, 1);
-        }
-      }
-      return true;
-    } else {
-      // find the next ancestor node
-      String parentName = getNextAncestorName(n);
-      InnerNode parentNode = (InnerNode)childrenMap.get(parentName);
-      if (parentNode == null) {
-        // create a new InnerNode
-        parentNode = createParentNode(parentName);
-        children.add(parentNode);
-        childrenMap.put(parentNode.getName(), parentNode);
-      }
-      // add n to the subtree of the next ancestor node
-      if (parentNode.add(n)) {
-        numOfLeaves++;
-        synchronized (childrenStorageInfo) {
-          if (!childrenStorageInfo.containsKey(parentNode.getName())) {
-            childrenStorageInfo.put(
-                parentNode.getName(),
-                new EnumMap<StorageType, Integer>(StorageType.class));
-            for (StorageType st : dnDescriptor.getStorageTypes()) {
-              childrenStorageInfo.get(parentNode.getName()).put(st, 1);
-            }
-          } else {
-            EnumMap<StorageType, Integer> currentCount =
-                childrenStorageInfo.get(parentNode.getName());
-            for (StorageType st : dnDescriptor.getStorageTypes()) {
-              if (currentCount.containsKey(st)) {
-                currentCount.put(st, currentCount.get(st) + 1);
-              } else {
-                currentCount.put(st, 1);
-              }
-            }
-          }
-        }
-        return true;
-      } else {
-        return false;
-      }
-    }
-  }
-
-  @VisibleForTesting
-  HashMap <String, EnumMap<StorageType, Integer>> getChildrenStorageInfo() {
-    return childrenStorageInfo;
-  }
-
-
-  private DFSTopologyNodeImpl createParentNode(String parentName) {
-    return new DFSTopologyNodeImpl(
-        parentName, getPath(this), this, this.getLevel() + 1);
-  }
-
-  @Override
-  public boolean remove(Node n) {
-    if (!isAncestor(n)) {
-      throw new IllegalArgumentException(n.getName()
-          + ", which is located at " + n.getNetworkLocation()
-          + ", is not a descendant of " + getPath(this));
-    }
-    // In HDFS topology, the leaf node should always be DatanodeDescriptor
-    if (!(n instanceof DatanodeDescriptor)) {
-      throw new IllegalArgumentException("Unexpected node type "
-          + n.getClass().getName());
-    }
-    DatanodeDescriptor dnDescriptor = (DatanodeDescriptor) n;
-    if (isParent(n)) {
-      // this node is the parent of n; remove n directly
-      if (childrenMap.containsKey(n.getName())) {
-        for (int i=0; i<children.size(); i++) {
-          if (children.get(i).getName().equals(n.getName())) {
-            children.remove(i);
-            childrenMap.remove(n.getName());
-            synchronized (childrenStorageInfo) {
-              childrenStorageInfo.remove(dnDescriptor.getName());
-            }
-            numOfLeaves--;
-            n.setParent(null);
-            return true;
-          }
-        }
-      }
-      return false;
-    } else {
-      // find the next ancestor node: the parent node
-      String parentName = getNextAncestorName(n);
-      DFSTopologyNodeImpl parentNode =
-          (DFSTopologyNodeImpl)childrenMap.get(parentName);
-      if (parentNode == null) {
-        return false;
-      }
-      // remove n from the parent node
-      boolean isRemoved = parentNode.remove(n);
-      if (isRemoved) {
-        // if the parent node has no children, remove the parent node too
-        synchronized (childrenStorageInfo) {
-          EnumMap<StorageType, Integer> currentCount =
-              childrenStorageInfo.get(parentNode.getName());
-          EnumSet<StorageType> toRemove = EnumSet.noneOf(StorageType.class);
-          for (StorageType st : dnDescriptor.getStorageTypes()) {
-            int newCount = currentCount.get(st) - 1;
-            if (newCount == 0) {
-              toRemove.add(st);
-            }
-            currentCount.put(st, newCount);
-          }
-          for (StorageType st : toRemove) {
-            currentCount.remove(st);
-          }
-        }
-        if (parentNode.getNumOfChildren() == 0) {
-          for(int i=0; i < children.size(); i++) {
-            if (children.get(i).getName().equals(parentName)) {
-              children.remove(i);
-              childrenMap.remove(parentName);
-              childrenStorageInfo.remove(parentNode.getName());
-              break;
-            }
-          }
-        }
-        numOfLeaves--;
-      }
-      return isRemoved;
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3ea11df/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSNetworkTopology.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSNetworkTopology.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSNetworkTopology.java
deleted file mode 100644
index ac1edf9..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSNetworkTopology.java
+++ /dev/null
@@ -1,260 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.StorageType;
-import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
-import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.Timeout;
-
-import java.util.EnumMap;
-import java.util.HashMap;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-/**
- * This class tests the correctness of storage type info stored in
- * DFSNetworkTopology.
- */
-public class TestDFSNetworkTopology {
-  private static final Log LOG =
-      LogFactory.getLog(TestDFSNetworkTopology.class);
-  private final static DFSNetworkTopology CLUSTER =
-      DFSNetworkTopology.getInstance(new Configuration());
-  private DatanodeDescriptor[] dataNodes;
-
-  @Rule
-  public Timeout testTimeout = new Timeout(30000);
-
-  @Before
-  public void setupDatanodes() {
-    final String[] racks = {
-        "/l1/d1/r1", "/l1/d1/r1", "/l1/d1/r2", "/l1/d1/r2", "/l1/d1/r2",
-
-        "/l1/d2/r3", "/l1/d2/r3", "/l1/d2/r3",
-
-        "/l2/d3/r1", "/l2/d3/r2", "/l2/d3/r3", "/l2/d3/r4", "/l2/d3/r5",
-
-        "/l2/d4/r1", "/l2/d4/r1", "/l2/d4/r1", "/l2/d4/r1", "/l2/d4/r1",
-        "/l2/d4/r1", "/l2/d4/r1"};
-    final String[] hosts = {
-        "host1", "host2", "host3", "host4", "host5",
-        "host6", "host7", "host8", "host9", "host10",
-        "host11", "host12", "host13", "host14", "host15",
-        "host16", "host17", "host18", "host19", "host20"};
-    final StorageType[] types = {
-        StorageType.ARCHIVE, StorageType.DISK, StorageType.ARCHIVE,
-        StorageType.DISK, StorageType.DISK,
-
-        StorageType.DISK, StorageType.RAM_DISK, StorageType.SSD,
-
-        StorageType.DISK, StorageType.RAM_DISK, StorageType.DISK,
-        StorageType.ARCHIVE, StorageType.ARCHIVE,
-
-        StorageType.DISK, StorageType.DISK, StorageType.RAM_DISK,
-        StorageType.RAM_DISK, StorageType.ARCHIVE, StorageType.ARCHIVE,
-        StorageType.SSD};
-    final DatanodeStorageInfo[] storages =
-        DFSTestUtil.createDatanodeStorageInfos(20, racks, hosts, types);
-    dataNodes = DFSTestUtil.toDatanodeDescriptor(storages);
-    for (int i = 0; i < dataNodes.length; i++) {
-      CLUSTER.add(dataNodes[i]);
-    }
-    dataNodes[9].setDecommissioned();
-    dataNodes[10].setDecommissioned();
-  }
-
-  /**
-   * Test getting the storage type info of subtree.
-   * @throws Exception
-   */
-  @Test
-  public void testGetStorageTypeInfo() throws Exception {
-    // checking level = 2 nodes
-    DFSTopologyNodeImpl d1 =
-        (DFSTopologyNodeImpl) CLUSTER.getNode("/l1/d1");
-    HashMap<String, EnumMap<StorageType, Integer>> d1info =
-        d1.getChildrenStorageInfo();
-    assertEquals(2, d1info.keySet().size());
-    assertTrue(d1info.get("r1").size() == 2 && d1info.get("r2").size() == 2);
-    assertEquals(1, (int)d1info.get("r1").get(StorageType.DISK));
-    assertEquals(1, (int)d1info.get("r1").get(StorageType.ARCHIVE));
-    assertEquals(2, (int)d1info.get("r2").get(StorageType.DISK));
-    assertEquals(1, (int)d1info.get("r2").get(StorageType.ARCHIVE));
-
-    DFSTopologyNodeImpl d2 =
-        (DFSTopologyNodeImpl) CLUSTER.getNode("/l1/d2");
-    HashMap<String, EnumMap<StorageType, Integer>> d2info =
-        d2.getChildrenStorageInfo();
-    assertEquals(1, d2info.keySet().size());
-    assertTrue(d2info.get("r3").size() == 3);
-    assertEquals(1, (int)d2info.get("r3").get(StorageType.DISK));
-    assertEquals(1, (int)d2info.get("r3").get(StorageType.RAM_DISK));
-    assertEquals(1, (int)d2info.get("r3").get(StorageType.SSD));
-
-    DFSTopologyNodeImpl d3 =
-        (DFSTopologyNodeImpl) CLUSTER.getNode("/l2/d3");
-    HashMap<String, EnumMap<StorageType, Integer>> d3info =
-        d3.getChildrenStorageInfo();
-    assertEquals(5, d3info.keySet().size());
-    assertEquals(1, (int)d3info.get("r1").get(StorageType.DISK));
-    assertEquals(1, (int)d3info.get("r2").get(StorageType.RAM_DISK));
-    assertEquals(1, (int)d3info.get("r3").get(StorageType.DISK));
-    assertEquals(1, (int)d3info.get("r4").get(StorageType.ARCHIVE));
-    assertEquals(1, (int)d3info.get("r5").get(StorageType.ARCHIVE));
-
-    DFSTopologyNodeImpl d4 =
-        (DFSTopologyNodeImpl) CLUSTER.getNode("/l2/d4");
-    HashMap<String, EnumMap<StorageType, Integer>> d4info =
-        d4.getChildrenStorageInfo();
-    assertEquals(1, d4info.keySet().size());
-    assertEquals(2, (int)d4info.get("r1").get(StorageType.DISK));
-    assertEquals(2, (int)d4info.get("r1").get(StorageType.RAM_DISK));
-    assertEquals(2, (int)d4info.get("r1").get(StorageType.ARCHIVE));
-    assertEquals(1, (int)d4info.get("r1").get(StorageType.SSD));
-
-    DFSTopologyNodeImpl l1 =
-        (DFSTopologyNodeImpl) CLUSTER.getNode("/l1");
-    HashMap<String, EnumMap<StorageType, Integer>> l1info =
-        l1.getChildrenStorageInfo();
-    assertEquals(2, l1info.keySet().size());
-    assertTrue(l1info.get("d1").size() == 2
-        && l1info.get("d2").size() == 3);
-    assertEquals(2, (int)l1info.get("d1").get(StorageType.ARCHIVE));
-    assertEquals(3, (int)l1info.get("d1").get(StorageType.DISK));
-    assertEquals(1, (int)l1info.get("d2").get(StorageType.DISK));
-    assertEquals(1, (int)l1info.get("d2").get(StorageType.RAM_DISK));
-    assertEquals(1, (int)l1info.get("d2").get(StorageType.SSD));
-
-    // checking level = 1 nodes
-    DFSTopologyNodeImpl l2 =
-        (DFSTopologyNodeImpl) CLUSTER.getNode("/l2");
-    HashMap<String, EnumMap<StorageType, Integer>> l2info =
-        l2.getChildrenStorageInfo();
-    assertTrue(l2info.get("d3").size() == 3
-        && l2info.get("d4").size() == 4);
-    assertEquals(2, l2info.keySet().size());
-    assertEquals(2, (int)l2info.get("d3").get(StorageType.DISK));
-    assertEquals(2, (int)l2info.get("d3").get(StorageType.ARCHIVE));
-    assertEquals(1, (int)l2info.get("d3").get(StorageType.RAM_DISK));
-    assertEquals(2, (int)l2info.get("d4").get(StorageType.DISK));
-    assertEquals(2, (int)l2info.get("d4").get(StorageType.ARCHIVE));
-    assertEquals(2, (int)l2info.get("d4").get(StorageType.RAM_DISK));
-    assertEquals(1, (int)l2info.get("d4").get(StorageType.SSD));
-  }
-
-  /**
-   * Test the correctness of storage type info when nodes are added and removed.
-   * @throws Exception
-   */
-  @Test
-  public void testAddAndRemoveTopology() throws Exception {
-    String[] newRack = {"/l1/d1/r1", "/l1/d1/r3", "/l1/d3/r3", "/l1/d3/r3"};
-    String[] newHost = {"nhost1", "nhost2", "nhost3", "nhost4"};
-    String[] newips = {"30.30.30.30", "31.31.31.31", "32.32.32.32",
-        "33.33.33.33"};
-    StorageType[] newTypes = {StorageType.DISK, StorageType.SSD,
-        StorageType.SSD, StorageType.SSD};
-    DatanodeDescriptor[] newDD = new DatanodeDescriptor[4];
-
-    for (int i = 0; i<4; i++) {
-      DatanodeStorageInfo dsi = DFSTestUtil.createDatanodeStorageInfo(
-          "s" + newHost[i], newips[i], newRack[i], newHost[i],
-          newTypes[i], null);
-      newDD[i] = dsi.getDatanodeDescriptor();
-      CLUSTER.add(newDD[i]);
-    }
-
-    DFSTopologyNodeImpl d1 =
-        (DFSTopologyNodeImpl) CLUSTER.getNode("/l1/d1");
-    HashMap<String, EnumMap<StorageType, Integer>> d1info =
-        d1.getChildrenStorageInfo();
-    assertEquals(3, d1info.keySet().size());
-    assertTrue(d1info.get("r1").size() == 2 && d1info.get("r2").size() == 2
-      && d1info.get("r3").size() == 1);
-    assertEquals(2, (int)d1info.get("r1").get(StorageType.DISK));
-    assertEquals(1, (int)d1info.get("r1").get(StorageType.ARCHIVE));
-    assertEquals(2, (int)d1info.get("r2").get(StorageType.DISK));
-    assertEquals(1, (int)d1info.get("r2").get(StorageType.ARCHIVE));
-    assertEquals(1, (int)d1info.get("r3").get(StorageType.SSD));
-
-    DFSTopologyNodeImpl d3 =
-        (DFSTopologyNodeImpl) CLUSTER.getNode("/l1/d3");
-    HashMap<String, EnumMap<StorageType, Integer>> d3info =
-        d3.getChildrenStorageInfo();
-    assertEquals(1, d3info.keySet().size());
-    assertTrue(d3info.get("r3").size() == 1);
-    assertEquals(2, (int)d3info.get("r3").get(StorageType.SSD));
-
-    DFSTopologyNodeImpl l1 =
-        (DFSTopologyNodeImpl) CLUSTER.getNode("/l1");
-    HashMap<String, EnumMap<StorageType, Integer>> l1info =
-        l1.getChildrenStorageInfo();
-    assertEquals(3, l1info.keySet().size());
-    assertTrue(l1info.get("d1").size() == 3 &&
-        l1info.get("d2").size() == 3 && l1info.get("d3").size() == 1);
-    assertEquals(4, (int)l1info.get("d1").get(StorageType.DISK));
-    assertEquals(2, (int)l1info.get("d1").get(StorageType.ARCHIVE));
-    assertEquals(1, (int)l1info.get("d1").get(StorageType.SSD));
-    assertEquals(1, (int)l1info.get("d2").get(StorageType.SSD));
-    assertEquals(1, (int)l1info.get("d2").get(StorageType.RAM_DISK));
-    assertEquals(1, (int)l1info.get("d2").get(StorageType.DISK));
-    assertEquals(2, (int)l1info.get("d3").get(StorageType.SSD));
-
-
-    for (int i = 0; i<4; i++) {
-      CLUSTER.remove(newDD[i]);
-    }
-
-    // /d1/r3 should've been out, /d1/r1 should've been resumed
-    DFSTopologyNodeImpl nd1 =
-        (DFSTopologyNodeImpl) CLUSTER.getNode("/l1/d1");
-    HashMap<String, EnumMap<StorageType, Integer>> nd1info =
-        nd1.getChildrenStorageInfo();
-    assertEquals(2, nd1info.keySet().size());
-    assertTrue(nd1info.get("r1").size() == 2 && nd1info.get("r2").size() == 2);
-    assertEquals(1, (int)nd1info.get("r1").get(StorageType.DISK));
-    assertEquals(1, (int)nd1info.get("r1").get(StorageType.ARCHIVE));
-    assertEquals(2, (int)nd1info.get("r2").get(StorageType.DISK));
-    assertEquals(1, (int)nd1info.get("r2").get(StorageType.ARCHIVE));
-
-    // /l1/d3 should've been out, and /l1/d1 should've been resumed
-    DFSTopologyNodeImpl nl1 =
-        (DFSTopologyNodeImpl) CLUSTER.getNode("/l1");
-    HashMap<String, EnumMap<StorageType, Integer>> nl1info =
-        nl1.getChildrenStorageInfo();
-    assertEquals(2, nl1info.keySet().size());
-    assertTrue(l1info.get("d1").size() == 2
-        && l1info.get("d2").size() == 3);
-    assertEquals(2, (int)nl1info.get("d1").get(StorageType.ARCHIVE));
-    assertEquals(3, (int)nl1info.get("d1").get(StorageType.DISK));
-    assertEquals(1, (int)l1info.get("d2").get(StorageType.DISK));
-    assertEquals(1, (int)l1info.get("d2").get(StorageType.RAM_DISK));
-    assertEquals(1, (int)l1info.get("d2").get(StorageType.SSD));
-
-    assertNull(CLUSTER.getNode("/l1/d3"));
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3ea11df/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/net/TestDFSNetworkTopology.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/net/TestDFSNetworkTopology.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/net/TestDFSNetworkTopology.java
new file mode 100644
index 0000000..32ecf886
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/net/TestDFSNetworkTopology.java
@@ -0,0 +1,449 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.net;
+
+import com.google.common.collect.Sets;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
+import org.apache.hadoop.net.Node;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+
+import java.util.EnumMap;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * This class tests the correctness of storage type info stored in
+ * DFSNetworkTopology.
+ */
+public class TestDFSNetworkTopology {
+  private static final Log LOG =
+      LogFactory.getLog(TestDFSNetworkTopology.class);
+  private final static DFSNetworkTopology CLUSTER =
+      DFSNetworkTopology.getInstance(new Configuration());
+  private DatanodeDescriptor[] dataNodes;
+
+  @Rule
+  public Timeout testTimeout = new Timeout(30000);
+
+  @Before
+  public void setupDatanodes() {
+    final String[] racks = {
+        "/l1/d1/r1", "/l1/d1/r1", "/l1/d1/r2", "/l1/d1/r2", "/l1/d1/r2",
+
+        "/l1/d2/r3", "/l1/d2/r3", "/l1/d2/r3",
+
+        "/l2/d3/r1", "/l2/d3/r2", "/l2/d3/r3", "/l2/d3/r4", "/l2/d3/r5",
+
+        "/l2/d4/r1", "/l2/d4/r1", "/l2/d4/r1", "/l2/d4/r1", "/l2/d4/r1",
+        "/l2/d4/r1", "/l2/d4/r1"};
+    final String[] hosts = {
+        "host1", "host2", "host3", "host4", "host5",
+        "host6", "host7", "host8",
+        "host9", "host10", "host11", "host12", "host13",
+        "host14", "host15", "host16", "host17", "host18", "host19", "host20"};
+    final StorageType[] types = {
+        StorageType.ARCHIVE, StorageType.DISK, StorageType.ARCHIVE,
+        StorageType.DISK, StorageType.DISK,
+
+        StorageType.DISK, StorageType.RAM_DISK, StorageType.SSD,
+
+        StorageType.DISK, StorageType.RAM_DISK, StorageType.DISK,
+        StorageType.ARCHIVE, StorageType.ARCHIVE,
+
+        StorageType.DISK, StorageType.DISK, StorageType.RAM_DISK,
+        StorageType.RAM_DISK, StorageType.ARCHIVE, StorageType.ARCHIVE,
+        StorageType.SSD};
+    final DatanodeStorageInfo[] storages =
+        DFSTestUtil.createDatanodeStorageInfos(20, racks, hosts, types);
+    dataNodes = DFSTestUtil.toDatanodeDescriptor(storages);
+    for (int i = 0; i < dataNodes.length; i++) {
+      CLUSTER.add(dataNodes[i]);
+    }
+    dataNodes[9].setDecommissioned();
+    dataNodes[10].setDecommissioned();
+  }
+
+  /**
+   * Test getting the storage type info of subtree.
+   * @throws Exception
+   */
+  @Test
+  public void testGetStorageTypeInfo() throws Exception {
+    // checking level = 2 nodes
+    DFSTopologyNodeImpl d1 =
+        (DFSTopologyNodeImpl) CLUSTER.getNode("/l1/d1");
+    HashMap<String, EnumMap<StorageType, Integer>> d1info =
+        d1.getChildrenStorageInfo();
+    assertEquals(2, d1info.keySet().size());
+    assertTrue(d1info.get("r1").size() == 2 && d1info.get("r2").size() == 2);
+    assertEquals(1, (int)d1info.get("r1").get(StorageType.DISK));
+    assertEquals(1, (int)d1info.get("r1").get(StorageType.ARCHIVE));
+    assertEquals(2, (int)d1info.get("r2").get(StorageType.DISK));
+    assertEquals(1, (int)d1info.get("r2").get(StorageType.ARCHIVE));
+
+    DFSTopologyNodeImpl d2 =
+        (DFSTopologyNodeImpl) CLUSTER.getNode("/l1/d2");
+    HashMap<String, EnumMap<StorageType, Integer>> d2info =
+        d2.getChildrenStorageInfo();
+    assertEquals(1, d2info.keySet().size());
+    assertTrue(d2info.get("r3").size() == 3);
+    assertEquals(1, (int)d2info.get("r3").get(StorageType.DISK));
+    assertEquals(1, (int)d2info.get("r3").get(StorageType.RAM_DISK));
+    assertEquals(1, (int)d2info.get("r3").get(StorageType.SSD));
+
+    DFSTopologyNodeImpl d3 =
+        (DFSTopologyNodeImpl) CLUSTER.getNode("/l2/d3");
+    HashMap<String, EnumMap<StorageType, Integer>> d3info =
+        d3.getChildrenStorageInfo();
+    assertEquals(5, d3info.keySet().size());
+    assertEquals(1, (int)d3info.get("r1").get(StorageType.DISK));
+    assertEquals(1, (int)d3info.get("r2").get(StorageType.RAM_DISK));
+    assertEquals(1, (int)d3info.get("r3").get(StorageType.DISK));
+    assertEquals(1, (int)d3info.get("r4").get(StorageType.ARCHIVE));
+    assertEquals(1, (int)d3info.get("r5").get(StorageType.ARCHIVE));
+
+    DFSTopologyNodeImpl d4 =
+        (DFSTopologyNodeImpl) CLUSTER.getNode("/l2/d4");
+    HashMap<String, EnumMap<StorageType, Integer>> d4info =
+        d4.getChildrenStorageInfo();
+    assertEquals(1, d4info.keySet().size());
+    assertEquals(2, (int)d4info.get("r1").get(StorageType.DISK));
+    assertEquals(2, (int)d4info.get("r1").get(StorageType.RAM_DISK));
+    assertEquals(2, (int)d4info.get("r1").get(StorageType.ARCHIVE));
+    assertEquals(1, (int)d4info.get("r1").get(StorageType.SSD));
+
+    DFSTopologyNodeImpl l1 =
+        (DFSTopologyNodeImpl) CLUSTER.getNode("/l1");
+    HashMap<String, EnumMap<StorageType, Integer>> l1info =
+        l1.getChildrenStorageInfo();
+    assertEquals(2, l1info.keySet().size());
+    assertTrue(l1info.get("d1").size() == 2
+        && l1info.get("d2").size() == 3);
+    assertEquals(2, (int)l1info.get("d1").get(StorageType.ARCHIVE));
+    assertEquals(3, (int)l1info.get("d1").get(StorageType.DISK));
+    assertEquals(1, (int)l1info.get("d2").get(StorageType.DISK));
+    assertEquals(1, (int)l1info.get("d2").get(StorageType.RAM_DISK));
+    assertEquals(1, (int)l1info.get("d2").get(StorageType.SSD));
+
+    // checking level = 1 nodes
+    DFSTopologyNodeImpl l2 =
+        (DFSTopologyNodeImpl) CLUSTER.getNode("/l2");
+    HashMap<String, EnumMap<StorageType, Integer>> l2info =
+        l2.getChildrenStorageInfo();
+    assertTrue(l2info.get("d3").size() == 3
+        && l2info.get("d4").size() == 4);
+    assertEquals(2, l2info.keySet().size());
+    assertEquals(2, (int)l2info.get("d3").get(StorageType.DISK));
+    assertEquals(2, (int)l2info.get("d3").get(StorageType.ARCHIVE));
+    assertEquals(1, (int)l2info.get("d3").get(StorageType.RAM_DISK));
+    assertEquals(2, (int)l2info.get("d4").get(StorageType.DISK));
+    assertEquals(2, (int)l2info.get("d4").get(StorageType.ARCHIVE));
+    assertEquals(2, (int)l2info.get("d4").get(StorageType.RAM_DISK));
+    assertEquals(1, (int)l2info.get("d4").get(StorageType.SSD));
+  }
+
+  /**
+   * Test the correctness of storage type info when nodes are added and removed.
+   * @throws Exception
+   */
+  @Test
+  public void testAddAndRemoveTopology() throws Exception {
+    String[] newRack = {"/l1/d1/r1", "/l1/d1/r3", "/l1/d3/r3", "/l1/d3/r3"};
+    String[] newHost = {"nhost1", "nhost2", "nhost3", "nhost4"};
+    String[] newips = {"30.30.30.30", "31.31.31.31", "32.32.32.32",
+        "33.33.33.33"};
+    StorageType[] newTypes = {StorageType.DISK, StorageType.SSD,
+        StorageType.SSD, StorageType.SSD};
+    DatanodeDescriptor[] newDD = new DatanodeDescriptor[4];
+
+    for (int i = 0; i<4; i++) {
+      DatanodeStorageInfo dsi = DFSTestUtil.createDatanodeStorageInfo(
+          "s" + newHost[i], newips[i], newRack[i], newHost[i],
+          newTypes[i], null);
+      newDD[i] = dsi.getDatanodeDescriptor();
+      CLUSTER.add(newDD[i]);
+    }
+
+    DFSTopologyNodeImpl d1 =
+        (DFSTopologyNodeImpl) CLUSTER.getNode("/l1/d1");
+    HashMap<String, EnumMap<StorageType, Integer>> d1info =
+        d1.getChildrenStorageInfo();
+    assertEquals(3, d1info.keySet().size());
+    assertTrue(d1info.get("r1").size() == 2 && d1info.get("r2").size() == 2
+      && d1info.get("r3").size() == 1);
+    assertEquals(2, (int)d1info.get("r1").get(StorageType.DISK));
+    assertEquals(1, (int)d1info.get("r1").get(StorageType.ARCHIVE));
+    assertEquals(2, (int)d1info.get("r2").get(StorageType.DISK));
+    assertEquals(1, (int)d1info.get("r2").get(StorageType.ARCHIVE));
+    assertEquals(1, (int)d1info.get("r3").get(StorageType.SSD));
+
+    DFSTopologyNodeImpl d3 =
+        (DFSTopologyNodeImpl) CLUSTER.getNode("/l1/d3");
+    HashMap<String, EnumMap<StorageType, Integer>> d3info =
+        d3.getChildrenStorageInfo();
+    assertEquals(1, d3info.keySet().size());
+    assertTrue(d3info.get("r3").size() == 1);
+    assertEquals(2, (int)d3info.get("r3").get(StorageType.SSD));
+
+    DFSTopologyNodeImpl l1 =
+        (DFSTopologyNodeImpl) CLUSTER.getNode("/l1");
+    HashMap<String, EnumMap<StorageType, Integer>> l1info =
+        l1.getChildrenStorageInfo();
+    assertEquals(3, l1info.keySet().size());
+    assertTrue(l1info.get("d1").size() == 3 &&
+        l1info.get("d2").size() == 3 && l1info.get("d3").size() == 1);
+    assertEquals(4, (int)l1info.get("d1").get(StorageType.DISK));
+    assertEquals(2, (int)l1info.get("d1").get(StorageType.ARCHIVE));
+    assertEquals(1, (int)l1info.get("d1").get(StorageType.SSD));
+    assertEquals(1, (int)l1info.get("d2").get(StorageType.SSD));
+    assertEquals(1, (int)l1info.get("d2").get(StorageType.RAM_DISK));
+    assertEquals(1, (int)l1info.get("d2").get(StorageType.DISK));
+    assertEquals(2, (int)l1info.get("d3").get(StorageType.SSD));
+
+
+    for (int i = 0; i<4; i++) {
+      CLUSTER.remove(newDD[i]);
+    }
+
+    // /d1/r3 should've been out, /d1/r1 should've been resumed
+    DFSTopologyNodeImpl nd1 =
+        (DFSTopologyNodeImpl) CLUSTER.getNode("/l1/d1");
+    HashMap<String, EnumMap<StorageType, Integer>> nd1info =
+        nd1.getChildrenStorageInfo();
+    assertEquals(2, nd1info.keySet().size());
+    assertTrue(nd1info.get("r1").size() == 2 && nd1info.get("r2").size() == 2);
+    assertEquals(1, (int)nd1info.get("r1").get(StorageType.DISK));
+    assertEquals(1, (int)nd1info.get("r1").get(StorageType.ARCHIVE));
+    assertEquals(2, (int)nd1info.get("r2").get(StorageType.DISK));
+    assertEquals(1, (int)nd1info.get("r2").get(StorageType.ARCHIVE));
+
+    // /l1/d3 should've been out, and /l1/d1 should've been resumed
+    DFSTopologyNodeImpl nl1 =
+        (DFSTopologyNodeImpl) CLUSTER.getNode("/l1");
+    HashMap<String, EnumMap<StorageType, Integer>> nl1info =
+        nl1.getChildrenStorageInfo();
+    assertEquals(2, nl1info.keySet().size());
+    assertTrue(l1info.get("d1").size() == 2
+        && l1info.get("d2").size() == 3);
+    assertEquals(2, (int)nl1info.get("d1").get(StorageType.ARCHIVE));
+    assertEquals(3, (int)nl1info.get("d1").get(StorageType.DISK));
+    assertEquals(1, (int)l1info.get("d2").get(StorageType.DISK));
+    assertEquals(1, (int)l1info.get("d2").get(StorageType.RAM_DISK));
+    assertEquals(1, (int)l1info.get("d2").get(StorageType.SSD));
+
+    assertNull(CLUSTER.getNode("/l1/d3"));
+  }
+
+  @Test
+  public void testChooseRandomWithStorageType() throws Exception {
+    Node n;
+    DatanodeDescriptor dd;
+    // test the choose random can return desired storage type nodes without
+    // exclude
+    Set<String> diskUnderL1 =
+        Sets.newHashSet("host2", "host4", "host5", "host6");
+    Set<String> archiveUnderL1 = Sets.newHashSet("host1", "host3");
+    Set<String> ramdiskUnderL1 = Sets.newHashSet("host7");
+    Set<String> ssdUnderL1 = Sets.newHashSet("host8");
+    for (int i = 0; i < 10; i++) {
+      n = CLUSTER.chooseRandomWithStorageType("/l1", null, null,
+          StorageType.DISK);
+      assertTrue(n instanceof DatanodeDescriptor);
+      dd = (DatanodeDescriptor) n;
+      assertTrue(diskUnderL1.contains(dd.getHostName()));
+
+      n = CLUSTER.chooseRandomWithStorageType("/l1", null, null,
+          StorageType.RAM_DISK);
+      assertTrue(n instanceof DatanodeDescriptor);
+      dd = (DatanodeDescriptor) n;
+      assertTrue(ramdiskUnderL1.contains(dd.getHostName()));
+
+      n = CLUSTER.chooseRandomWithStorageType("/l1", null, null,
+          StorageType.ARCHIVE);
+      assertTrue(n instanceof DatanodeDescriptor);
+      dd = (DatanodeDescriptor) n;
+      assertTrue(archiveUnderL1.contains(dd.getHostName()));
+
+      n = CLUSTER.chooseRandomWithStorageType("/l1", null, null,
+          StorageType.SSD);
+      assertTrue(n instanceof DatanodeDescriptor);
+      dd = (DatanodeDescriptor) n;
+      assertTrue(ssdUnderL1.contains(dd.getHostName()));
+    }
+  }
+
+  @Test
+  public void testChooseRandomWithStorageTypeWithExcluded() throws Exception {
+    Node n;
+    DatanodeDescriptor dd;
+    // below test choose random with exclude, for /l2/d3, every rack has exactly
+    // one host
+    // /l2/d3 has five racks r[1~5] but only r4 and r5 have ARCHIVE
+    // host12 is the one under "/l2/d3/r4", host13 is the one under "/l2/d3/r5"
+    n = CLUSTER.chooseRandomWithStorageType("/l2/d3/r4", null, null,
+        StorageType.ARCHIVE);
+    HashSet<Node> excluded = new HashSet<>();
+    // exclude the host on r4 (since there is only one host, no randomness here)
+    excluded.add(n);
+
+    for (int i = 0; i<10; i++) {
+      n = CLUSTER.chooseRandomWithStorageType("/l2/d3", null, null,
+          StorageType.ARCHIVE);
+      assertTrue(n instanceof DatanodeDescriptor);
+      dd = (DatanodeDescriptor) n;
+      assertTrue(dd.getHostName().equals("host12") ||
+          dd.getHostName().equals("host13"));
+    }
+
+    // test exclude nodes
+    for (int i = 0; i<10; i++) {
+      n = CLUSTER.chooseRandomWithStorageType("/l2/d3", null, excluded,
+          StorageType.ARCHIVE);
+      assertTrue(n instanceof DatanodeDescriptor);
+      dd = (DatanodeDescriptor) n;
+      assertTrue(dd.getHostName().equals("host13"));
+    }
+
+    // test exclude scope
+    for (int i = 0; i<10; i++) {
+      n = CLUSTER.chooseRandomWithStorageType("/l2/d3", "/l2/d3/r4", null,
+          StorageType.ARCHIVE);
+      assertTrue(n instanceof DatanodeDescriptor);
+      dd = (DatanodeDescriptor) n;
+      assertTrue(dd.getHostName().equals("host13"));
+    }
+
+    // test exclude scope + excluded node with expected null return node
+    for (int i = 0; i<10; i++) {
+      n = CLUSTER.chooseRandomWithStorageType("/l2/d3", "/l2/d3/r5", excluded,
+          StorageType.ARCHIVE);
+      assertNull(n);
+    }
+
+    // test exclude scope + excluded node with expected non-null return node
+    n = CLUSTER.chooseRandomWithStorageType("/l1/d2", null, null,
+        StorageType.DISK);
+    dd = (DatanodeDescriptor)n;
+    assertEquals("host6", dd.getHostName());
+    // exclude the host on r4 (since there is only one host, no randomness here)
+    excluded.add(n);
+    Set<String> expectedSet = Sets.newHashSet("host4", "host5");
+    for (int i = 0; i<10; i++) {
+      // under l1, there are four hosts with DISK:
+      // /l1/d1/r1/host2, /l1/d1/r2/host4, /l1/d1/r2/host5 and /l1/d2/r3/host6
+      // host6 is excludedNode, host2 is under excluded range scope /l1/d1/r1
+      // so should always return r4 or r5
+      n = CLUSTER.chooseRandomWithStorageType(
+          "/l1", "/l1/d1/r1", excluded, StorageType.DISK);
+      dd = (DatanodeDescriptor) n;
+      assertTrue(expectedSet.contains(dd.getHostName()));
+    }
+  }
+
+
+  /**
+   * This test tests the wrapper method. The wrapper method only takes one scope
+   * where if it starts with a ~, it is an excluded scope, and searching always
+   * from root. Otherwise it is a scope.
+   * @throws Exception throws exception.
+   */
+  @Test
+  public void testChooseRandomWithStorageTypeWrapper() throws Exception {
+    Node n;
+    DatanodeDescriptor dd;
+    n = CLUSTER.chooseRandomWithStorageType("/l2/d3/r4", null, null,
+        StorageType.ARCHIVE);
+    HashSet<Node> excluded = new HashSet<>();
+    // exclude the host on r4 (since there is only one host, no randomness here)
+    excluded.add(n);
+
+    // search with given scope being desired scope
+    for (int i = 0; i<10; i++) {
+      n = CLUSTER.chooseRandomWithStorageType(
+          "/l2/d3", null, StorageType.ARCHIVE);
+      assertTrue(n instanceof DatanodeDescriptor);
+      dd = (DatanodeDescriptor) n;
+      assertTrue(dd.getHostName().equals("host12") ||
+          dd.getHostName().equals("host13"));
+    }
+
+    for (int i = 0; i<10; i++) {
+      n = CLUSTER.chooseRandomWithStorageType(
+          "/l2/d3", excluded, StorageType.ARCHIVE);
+      assertTrue(n instanceof DatanodeDescriptor);
+      dd = (DatanodeDescriptor) n;
+      assertTrue(dd.getHostName().equals("host13"));
+    }
+
+    // search with given scope being exclude scope
+
+    // a total of 4 ramdisk nodes:
+    // /l1/d2/r3/host7, /l2/d3/r2/host10, /l2/d4/r1/host7 and /l2/d4/r1/host10
+    // so if we exclude /l2/d4/r1, if should be always either host7 or host10
+    for (int i = 0; i<10; i++) {
+      n = CLUSTER.chooseRandomWithStorageType(
+          "~/l2/d4", null, StorageType.RAM_DISK);
+      assertTrue(n instanceof DatanodeDescriptor);
+      dd = (DatanodeDescriptor) n;
+      assertTrue(dd.getHostName().equals("host7") ||
+          dd.getHostName().equals("host10"));
+    }
+
+    // similar to above, except that we also exclude host10 here. so it should
+    // always be host7
+    n = CLUSTER.chooseRandomWithStorageType("/l2/d3/r2", null, null,
+        StorageType.RAM_DISK);
+    // add host10 to exclude
+    excluded.add(n);
+    for (int i = 0; i<10; i++) {
+      n = CLUSTER.chooseRandomWithStorageType(
+          "~/l2/d4", excluded, StorageType.RAM_DISK);
+      assertTrue(n instanceof DatanodeDescriptor);
+      dd = (DatanodeDescriptor) n;
+      assertTrue(dd.getHostName().equals("host7"));
+    }
+  }
+
+  @Test
+  public void testNonExistingNode() throws Exception {
+    Node n;
+    n = CLUSTER.chooseRandomWithStorageType(
+        "/l100", null, null, StorageType.DISK);
+    assertNull(n);
+    n = CLUSTER.chooseRandomWithStorageType(
+        "/l100/d100", null, null, StorageType.DISK);
+    assertNull(n);
+    n = CLUSTER.chooseRandomWithStorageType(
+        "/l100/d100/r100", null, null, StorageType.DISK);
+    assertNull(n);
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org