You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sa...@apache.org on 2019/03/18 09:45:01 UTC

[hadoop] branch trunk updated: HDDS-699. Detect Ozone Network topology. Contributed by Sammi Chen.

This is an automated email from the ASF dual-hosted git repository.

sammichen pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4d2a116  HDDS-699. Detect Ozone Network topology. Contributed by Sammi Chen.
4d2a116 is described below

commit 4d2a116d6ef865c29d0df0a743e91874942af412
Author: Sammi Chen <sa...@apache.org>
AuthorDate: Sun Mar 17 17:07:57 2019 +0800

    HDDS-699. Detect Ozone Network topology. Contributed by Sammi Chen.
---
 .../org/apache/hadoop/hdds/scm/ScmConfigKeys.java  |   6 +
 .../org/apache/hadoop/hdds/scm/net/InnerNode.java  |  84 ++
 .../apache/hadoop/hdds/scm/net/InnerNodeImpl.java  | 495 +++++++++++
 .../apache/hadoop/hdds/scm/net/NetConstants.java   |  67 ++
 .../org/apache/hadoop/hdds/scm/net/NetUtils.java   | 156 ++++
 .../hadoop/hdds/scm/net/NetworkTopology.java       | 250 ++++++
 .../hadoop/hdds/scm/net/NetworkTopologyImpl.java   | 778 +++++++++++++++++
 .../java/org/apache/hadoop/hdds/scm/net/Node.java  |  89 ++
 .../org/apache/hadoop/hdds/scm/net/NodeImpl.java   | 200 +++++
 .../org/apache/hadoop/hdds/scm/net/NodeSchema.java | 148 ++++
 .../hadoop/hdds/scm/net/NodeSchemaLoader.java      | 388 +++++++++
 .../hadoop/hdds/scm/net/NodeSchemaManager.java     | 136 +++
 .../apache/hadoop/hdds/scm/net/package-info.java   |  21 +
 .../main/resources/network-topology-default.xml    |  68 ++
 .../main/resources/network-topology-nodegroup.xml  |  74 ++
 .../common/src/main/resources/ozone-default.xml    |   7 +
 .../hdds/scm/net/TestNetworkTopologyImpl.java      | 922 +++++++++++++++++++++
 .../hadoop/hdds/scm/net/TestNodeSchemaLoader.java  | 103 +++
 .../hadoop/hdds/scm/net/TestNodeSchemaManager.java | 101 +++
 .../networkTopologyTestFiles/enforce-error.xml     |  47 ++
 .../resources/networkTopologyTestFiles/good.xml    |  49 ++
 .../networkTopologyTestFiles/invalid-cost.xml      |  43 +
 .../networkTopologyTestFiles/invalid-version.xml   |  43 +
 .../networkTopologyTestFiles/multiple-leaf.xml     |  43 +
 .../networkTopologyTestFiles/multiple-root.xml     |  43 +
 .../networkTopologyTestFiles/multiple-topology.xml |  47 ++
 .../resources/networkTopologyTestFiles/no-leaf.xml |  43 +
 .../resources/networkTopologyTestFiles/no-root.xml |  43 +
 .../networkTopologyTestFiles/no-topology.xml       |  39 +
 .../path-layers-size-mismatch.xml                  |  43 +
 .../path-with-id-reference-failure.xml             |  43 +
 .../unknown-layer-type.xml                         |  43 +
 .../wrong-path-order-1.xml                         |  43 +
 .../wrong-path-order-2.xml                         |  43 +
 .../dist/dev-support/bin/dist-layout-stitching     |   2 +
 35 files changed, 4750 insertions(+)

diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
index a45a169..7a3baff 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
@@ -354,6 +354,12 @@ public final class ScmConfigKeys {
   public static final String
       HDDS_SCM_HTTP_KERBEROS_KEYTAB_FILE_KEY =
       "hdds.scm.http.kerberos.keytab";
+
+  // Network topology
+  public static final String OZONE_SCM_NETWORK_TOPOLOGY_SCHEMA_FILE =
+      "ozone.scm.network.topology.schema.file";
+  public static final String OZONE_SCM_NETWORK_TOPOLOGY_SCHEMA_FILE_DEFAULT =
+      "network-topology-default.xml";
   /**
    * Never constructed.
    */
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/InnerNode.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/InnerNode.java
new file mode 100644
index 0000000..a185b01
--- /dev/null
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/InnerNode.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.net;
+
+import java.util.Collection;
+
+/**
+ * The interface defines an inner node in a network topology.
+ * An inner node represents network topology entities, such as data center,
+ * rack, switch or logical group.
+ */
+public interface InnerNode extends Node {
+  /** A factory interface to get new InnerNode instance. */
+  interface Factory<N extends InnerNode> {
+    /** Construct an InnerNode from name, location, parent, level and cost. */
+    N newInnerNode(String name, String location, InnerNode parent, int level,
+        int cost);
+  }
+
+  /**
+   * Add node <i>n</i> to the subtree of this node.
+   * @param n node to be added
+   * @return true if the node is added; false otherwise
+   */
+  boolean add(Node n);
+
+  /**
+   * Remove node <i>n</i> from the subtree of this node.
+   * @param n node to be deleted
+   */
+  void remove(Node n);
+
+  /**
+   * Given a node's string representation, return a reference to the node.
+   * @param loc string location of the format /dc/rack/nodegroup/node
+   * @return null if the node is not found
+   */
+  Node getNode(String loc);
+
+  /**
+   * @return number of its all nodes at level <i>level</i>. Here level is a
+   * relative level. If level is 1, means node itself. If level is 2, means its
+   * direct children, and so on.
+   **/
+  int getNumOfNodes(int level);
+
+  /**
+   * Get <i>leafIndex</i> leaf of this subtree.
+   *
+   * @param leafIndex an indexed leaf of the node
+   * @return the leaf node corresponding to the given index.
+   */
+  Node getLeaf(int leafIndex);
+
+  /**
+   * Get <i>leafIndex</i> leaf of this subtree.
+   *
+   * @param leafIndex ode's index, start from 0, skip the nodes in
+   *                  excludedScope and excludedNodes with ancestorGen
+   * @param excludedScope the excluded scope
+   * @param excludedNodes nodes to be excluded. If ancestorGen is not 0,
+   *                      the chosen node will not share same ancestor with
+   *                      those in excluded nodes at the specified generation
+   * @param ancestorGen ignored with value is 0
+   * @return the leaf node corresponding to the given index
+   */
+  Node getLeaf(int leafIndex, String excludedScope,
+      Collection<Node> excludedNodes, int ancestorGen);
+}
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/InnerNodeImpl.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/InnerNodeImpl.java
new file mode 100644
index 0000000..3f1351d
--- /dev/null
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/InnerNodeImpl.java
@@ -0,0 +1,495 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.net;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.hadoop.hdds.scm.net.NetConstants.PATH_SEPARATOR_STR;
+import static org.apache.hadoop.hdds.scm.net.NetConstants.PATH_SEPARATOR;
+
+/**
+ * A thread safe class that implements InnerNode interface.
+ */
+public class InnerNodeImpl extends NodeImpl implements InnerNode {
+  protected static class Factory implements InnerNode.Factory<InnerNodeImpl> {
+    protected Factory() {}
+
+    public InnerNodeImpl newInnerNode(String name, String location,
+        InnerNode parent, int level, int cost) {
+      return new InnerNodeImpl(name, location, parent, level, cost);
+    }
+  }
+
+  static final Factory FACTORY = new Factory();
+  // a map of node's network name to Node for quick search and keep
+  // the insert order
+  private final HashMap<String, Node> childrenMap =
+      new LinkedHashMap<String, Node>();
+  // number of descendant leaves under this node
+  private int numOfLeaves;
+  // LOGGER
+  public static final Logger LOG = LoggerFactory.getLogger(InnerNodeImpl.class);
+
+  /**
+   * Construct an InnerNode from its name, network location, parent, level and
+   * its cost.
+   **/
+  protected InnerNodeImpl(String name, String location, InnerNode parent,
+      int level, int cost) {
+    super(name, location, parent, level, cost);
+  }
+
+  /** @return the number of children this node has */
+  private int getNumOfChildren() {
+    return childrenMap.size();
+  }
+
+  /** @return its leaf nodes number */
+  @Override
+  public int getNumOfLeaves() {
+    return numOfLeaves;
+  }
+
+  /**
+   * @return number of its all nodes at level <i>level</i>. Here level is a
+   * relative level. If level is 1, means node itself. If level is 2, means its
+   * direct children, and so on.
+   **/
+  public int getNumOfNodes(int level) {
+    Preconditions.checkArgument(level > 0);
+    int count = 0;
+    if (level == 1) {
+      count += 1;
+    } else if (level == 2) {
+      count += getNumOfChildren();
+    } else {
+      for (Node node: childrenMap.values()) {
+        if (node instanceof InnerNode) {
+          count += ((InnerNode)node).getNumOfNodes(level -1);
+        } else {
+          throw new RuntimeException("Cannot support Level:" + level +
+              " on this node " + this.toString());
+        }
+      }
+    }
+    return count;
+  }
+
+  /**
+   * Judge if this node is the parent of a leave node <i>n</i>.
+   * @return true if this node is the parent of <i>n</i>
+   */
+  private boolean isLeafParent() {
+    if (childrenMap.isEmpty()) {
+      return true;
+    }
+    Node child = childrenMap.values().iterator().next();
+    return child instanceof InnerNode ? false : true;
+  }
+
+  /**
+   * Judge if this node is the parent of node <i>node</i>.
+   * @param node a node
+   * @return true if this node is the parent of <i>n</i>
+   */
+  private boolean isParent(Node node) {
+    return node.getNetworkLocation().equals(this.getNetworkFullPath());
+  }
+
+  /**
+   * Add node <i>node</i> to the subtree of this node.
+   * @param node node to be added
+   * @return true if the node is added, false if is only updated
+   */
+  public boolean add(Node node) {
+    if (!isAncestor(node)) {
+      throw new IllegalArgumentException(node.getNetworkName()
+          + ", which is located at " + node.getNetworkLocation()
+          + ", is not a descendant of " + this.getNetworkFullPath());
+    }
+    if (isParent(node)) {
+      // this node is the parent, then add it directly
+      node.setParent(this);
+      node.setLevel(this.getLevel() + 1);
+      Node current = childrenMap.put(node.getNetworkName(), node);
+      if (current != null) {
+        return false;
+      }
+    } else {
+      // find the next level ancestor node
+      String ancestorName = getNextLevelAncestorName(node);
+      InnerNode childNode = (InnerNode)childrenMap.get(ancestorName);
+      if (childNode == null) {
+        // create a new InnerNode for this ancestor node
+        childNode = createChildNode(ancestorName);
+        childrenMap.put(childNode.getNetworkName(), childNode);
+      }
+      // add node to the subtree of the next ancestor node
+      if (!childNode.add(node)) {
+        return false;
+      }
+    }
+    numOfLeaves++;
+    return true;
+  }
+
+  /**
+   * Remove node <i>node</i> from the subtree of this node.
+   * @param node node to be deleted
+   */
+  public void remove(Node node) {
+    if (!isAncestor(node)) {
+      throw new IllegalArgumentException(node.getNetworkName()
+          + ", which is located at " + node.getNetworkLocation()
+          + ", is not a descendant of " + this.getNetworkFullPath());
+    }
+    if (isParent(node)) {
+      // this node is the parent, remove it directly
+      if (childrenMap.containsKey(node.getNetworkName())) {
+        childrenMap.remove(node.getNetworkName());
+        node.setParent(null);
+      } else {
+        throw new RuntimeException("Should not come to here. Node:" +
+            node.getNetworkFullPath() + ", Parent:" +
+            this.getNetworkFullPath());
+      }
+    } else {
+      // find the next ancestor node
+      String ancestorName = getNextLevelAncestorName(node);
+      InnerNodeImpl childNode = (InnerNodeImpl)childrenMap.get(ancestorName);
+      Preconditions.checkNotNull(childNode, "InnerNode is deleted before leaf");
+      // remove node from the parent node
+      childNode.remove(node);
+      // if the parent node has no children, remove the parent node too
+      if (childNode.getNumOfChildren() == 0) {
+        childrenMap.remove(ancestorName);
+      }
+    }
+    numOfLeaves--;
+  }
+
+  /**
+   * Given a node's string representation, return a reference to the node.
+   * Node can be leaf node or inner node.
+   *
+   * @param loc string location of a node. If loc starts with "/", it's a
+   *            absolute path, otherwise a relative path. Following examples
+   *            are all accepted,
+   *            1.  /dc1/rm1/rack1          -> an inner node
+   *            2.  /dc1/rm1/rack1/node1    -> a leaf node
+   *            3.  rack1/node1             -> a relative path to this node
+   *
+   * @return null if the node is not found
+   */
+  public Node getNode(String loc) {
+    if (loc == null) {
+      return null;
+    }
+
+    String fullPath = this.getNetworkFullPath();
+    if (loc.equalsIgnoreCase(fullPath)) {
+      return this;
+    }
+
+    // remove current node's location from loc when it's a absolute path
+    if (fullPath.equals(NetConstants.PATH_SEPARATOR_STR)) {
+      // current node is ROOT
+      if (loc.startsWith(PATH_SEPARATOR_STR)) {
+        loc = loc.substring(1);
+      }
+    } else if (loc.startsWith(fullPath)) {
+      loc = loc.substring(fullPath.length());
+      // skip the separator "/"
+      loc = loc.substring(1);
+    }
+
+    String[] path = loc.split(PATH_SEPARATOR_STR, 2);
+    Node child = childrenMap.get(path[0]);
+    if (child == null) {
+      return null;
+    }
+    if (path.length == 1){
+      return child;
+    }
+    if (child instanceof InnerNode) {
+      return ((InnerNode)child).getNode(path[1]);
+    } else {
+      return null;
+    }
+  }
+
+  /**
+   * get <i>leafIndex</i> leaf of this subtree.
+   *
+   * @param leafIndex an indexed leaf of the node
+   * @return the leaf node corresponding to the given index.
+   */
+  public Node getLeaf(int leafIndex) {
+    Preconditions.checkArgument(leafIndex >= 0);
+    // children are leaves
+    if (isLeafParent()) {
+      // range check
+      if (leafIndex >= getNumOfChildren()) {
+        return null;
+      }
+      return getChildNode(leafIndex);
+    } else {
+      for(Node node : childrenMap.values()) {
+        InnerNodeImpl child = (InnerNodeImpl)node;
+        int leafCount = child.getNumOfLeaves();
+        if (leafIndex < leafCount) {
+          return child.getLeaf(leafIndex);
+        } else {
+          leafIndex -= leafCount;
+        }
+      }
+      return null;
+    }
+  }
+
+  /**
+   * Get <i>leafIndex</i> leaf of this subtree.
+   *
+   * @param leafIndex node's index, start from 0, skip the nodes in
+   *                 excludedScope and excludedNodes with ancestorGen
+   * @param excludedScope the exclude scope
+   * @param excludedNodes nodes to be excluded from. If ancestorGen is not 0,
+   *                      the chosen node will not share same ancestor with
+   *                      those in excluded nodes at the specified generation
+   * @param ancestorGen  apply to excludeNodes, when value is 0, then no same
+   *                    ancestor enforcement on excludedNodes
+   * @return the leaf node corresponding to the given index.
+   * Example:
+   *
+   *                                /  --- root
+   *                              /  \
+   *                             /    \
+   *                            /      \
+   *                           /        \
+   *                         dc1         dc2
+   *                        / \         / \
+   *                       /   \       /   \
+   *                      /     \     /     \
+   *                    rack1 rack2  rack1  rack2
+   *                   / \     / \  / \     / \
+   *                 n1  n2  n3 n4 n5  n6  n7 n8
+   *
+   *   Input:
+   *   leafIndex = 2
+   *   excludedScope = /dc2
+   *   excludedNodes = {/dc1/rack1/n1}
+   *   ancestorGen = 1
+   *
+   *   Output:
+   *   node /dc1/rack2/n5
+   *
+   *   Explanation:
+   *   Since excludedNodes is n1 and ancestorGen is 1, it means nodes under
+   *   /root/dc1/rack1 are excluded. Given leafIndex start from 0, LeafIndex 2
+   *   means picking the 3th available node, which is n5.
+   *
+   */
+  public Node getLeaf(int leafIndex, String excludedScope,
+      Collection<Node> excludedNodes, int ancestorGen) {
+    Preconditions.checkArgument(leafIndex >= 0 && ancestorGen >= 0);
+    // come to leaf parent layer
+    if (isLeafParent()) {
+      return getLeafOnLeafParent(leafIndex, excludedScope, excludedNodes);
+    }
+
+    int maxLevel = NodeSchemaManager.getInstance().getMaxLevel();
+    // this node's children, it's generation as the ancestor of the leaf node
+    int currentGen = maxLevel - this.getLevel() - 1;
+    // build an ancestor(children) to exclude node count map
+    Map<Node, Integer> countMap =
+        getAncestorCountMap(excludedNodes, ancestorGen, currentGen);
+    // nodes covered by excluded scope
+    int excludedNodeCount = getExcludedScopeNodeCount(excludedScope);
+
+    for(Node child : childrenMap.values()) {
+      int leafCount = child.getNumOfLeaves();
+      // skip nodes covered by excluded scope
+      if (excludedScope != null &&
+          excludedScope.startsWith(child.getNetworkFullPath())) {
+        leafCount -= excludedNodeCount;
+      }
+      // skip nodes covered by excluded nodes and ancestorGen
+      Integer count = countMap.get(child);
+      if (count != null) {
+        leafCount -= count;
+      }
+      if (leafIndex < leafCount) {
+        return ((InnerNode)child).getLeaf(leafIndex, excludedScope,
+            excludedNodes, ancestorGen);
+      } else {
+        leafIndex -= leafCount;
+      }
+    }
+    return null;
+  }
+
+  @Override
+  public boolean equals(Object to) {
+    if (to == null) {
+      return false;
+    }
+    if (this == to) {
+      return true;
+    }
+    return this.toString().equals(to.toString());
+  }
+
+  @Override
+  public int hashCode() {
+    return super.hashCode();
+  }
+
+  /**
+   * Get a ancestor to its excluded node count map.
+   *
+   * @param nodes a collection of leaf nodes to exclude
+   * @param genToExclude  the ancestor generation to exclude
+   * @param genToReturn  the ancestor generation to return the count map
+   * @return the map.
+   * example:
+   *
+   *                *  --- root
+   *              /    \
+   *             *      *   -- genToReturn =2
+   *            / \    / \
+   *          *   *   *   *  -- genToExclude = 1
+   *         /\  /\  /\  /\
+   *       *  * * * * * * *  -- nodes
+   */
+  private Map<Node, Integer> getAncestorCountMap(Collection<Node> nodes,
+      int genToExclude, int genToReturn) {
+    Preconditions.checkState(genToExclude >= 0);
+    Preconditions.checkState(genToReturn >= 0);
+
+    if (nodes == null || nodes.size() == 0) {
+      return Collections.emptyMap();
+    }
+    // with the recursive call, genToReturn can be smaller than genToExclude
+    if (genToReturn < genToExclude) {
+      genToExclude = genToReturn;
+    }
+    // ancestorToExclude to ancestorToReturn map
+    HashMap<Node, Node> ancestorMap = new HashMap<>();
+    for (Node node: nodes) {
+      Node ancestorToExclude = node.getAncestor(genToExclude);
+      Node ancestorToReturn = node.getAncestor(genToReturn);
+      if (ancestorToExclude == null || ancestorToReturn == null) {
+        LOG.warn("Ancestor not found, node: " + node.getNetworkFullPath() +
+            ", generation to exclude: " + genToExclude +
+            ", generation to return:" + genToReturn);
+        continue;
+      }
+      ancestorMap.put(ancestorToExclude, ancestorToReturn);
+    }
+    // ancestorToReturn to exclude node count map
+    HashMap<Node, Integer> countMap = new HashMap<>();
+    for (Map.Entry<Node, Node> entry : ancestorMap.entrySet()) {
+      countMap.compute(entry.getValue(),
+          (key, n) -> (n == null ? 0 : n) + entry.getKey().getNumOfLeaves());
+    }
+
+    return countMap;
+  }
+
+  /**
+   *  Get the node with leafIndex, considering skip nodes in excludedScope
+   *  and in excludeNodes list.
+   */
+  private Node getLeafOnLeafParent(int leafIndex, String excludedScope,
+      Collection<Node> excludedNodes) {
+    Preconditions.checkArgument(isLeafParent() && leafIndex >= 0);
+    if (leafIndex >= getNumOfChildren()) {
+      return null;
+    }
+    for(Node node : childrenMap.values()) {
+      if ((excludedNodes != null && (excludedNodes.contains(node))) ||
+          (excludedScope != null &&
+              (node.getNetworkFullPath().startsWith(excludedScope)))) {
+        continue;
+      }
+      if (leafIndex == 0) {
+        return node;
+      }
+      leafIndex--;
+    }
+    return null;
+  }
+
+  /**
+   *  Return child's name of this node which is an ancestor of node <i>n</i>.
+   */
+  private String getNextLevelAncestorName(Node n) {
+    int parentPathLen = this.getNetworkFullPath().length();
+    String name = n.getNetworkLocation().substring(parentPathLen);
+    if (name.charAt(0) == PATH_SEPARATOR) {
+      name = name.substring(1);
+    }
+    int index = name.indexOf(PATH_SEPARATOR);
+    if (index != -1) {
+      name = name.substring(0, index);
+    }
+    return name;
+  }
+
+  /**
+   * Creates a child node to be added to the list of children.
+   * @param name The name of the child node
+   * @return A new inner node
+   * @see InnerNodeImpl(String, String, InnerNode, int)
+   */
+  private InnerNodeImpl createChildNode(String name) {
+    int childLevel = this.getLevel() + 1;
+    int cost = NodeSchemaManager.getInstance().getCost(childLevel);
+    return new InnerNodeImpl(name, this.getNetworkFullPath(), this, childLevel,
+        cost);
+  }
+
+  /** Get node with index <i>index</i>. */
+  private Node getChildNode(int index) {
+    Iterator iterator = childrenMap.values().iterator();
+    Node node = null;
+    while(index >= 0 && iterator.hasNext()) {
+      node = (Node)iterator.next();
+      index--;
+    }
+    return node;
+  }
+
+  /** Get how many leaf nodes are covered by the excludedScope. */
+  private int getExcludedScopeNodeCount(String excludedScope) {
+    if (excludedScope == null) {
+      return 0;
+    }
+    Node excludedScopeNode = getNode(excludedScope);
+    return excludedScopeNode == null ? 0 : excludedScopeNode.getNumOfLeaves();
+  }
+}
\ No newline at end of file
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/NetConstants.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/NetConstants.java
new file mode 100644
index 0000000..0e1b076
--- /dev/null
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/NetConstants.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.net;
+
+import  org.apache.hadoop.hdds.scm.net.NodeSchema.LayerType;
+
+/**
+ * Class to hold network topology related constants and configurations.
+ */
+public final class NetConstants {
+  private NetConstants() {
+    // Prevent instantiation
+  }
+  public final static char PATH_SEPARATOR = '/';
+  /** Path separator as a string. */
+  public final static String PATH_SEPARATOR_STR = "/";
+  public final static String SCOPE_REVERSE_STR = "~";
+  /** string representation of root. */
+  public final static String ROOT = "";
+  public final static int INNER_NODE_COST_DEFAULT = 1;
+  public final static int NODE_COST_DEFAULT = 0;
+  public final static int ANCESTOR_GENERATION_DEFAULT = 0;
+  public final static int ROOT_LEVEL = 1;
+  public final static String NODE_COST_PREFIX = "$";
+  public final static String DEFAULT_RACK = "/default-rack";
+  public final static String DEFAULT_NODEGROUP = "/default-nodegroup";
+  public final static String DEFAULT_DATACENTER = "/default-datacenter";
+  public final static String DEFAULT_REGION = "/default-dataregion";
+
+  // Build-in network topology node schema
+  public static final NodeSchema ROOT_SCHEMA =
+      new NodeSchema.Builder().setType(LayerType.ROOT).build();
+
+  public static final NodeSchema REGION_SCHEMA =
+      new NodeSchema.Builder().setType(LayerType.INNER_NODE)
+          .setDefaultName(DEFAULT_REGION).build();
+
+  public static final NodeSchema DATACENTER_SCHEMA =
+      new NodeSchema.Builder().setType(LayerType.INNER_NODE)
+          .setDefaultName(DEFAULT_DATACENTER).build();
+
+  public static final NodeSchema RACK_SCHEMA =
+      new NodeSchema.Builder().setType(LayerType.INNER_NODE)
+          .setDefaultName(DEFAULT_RACK).build();
+
+  public static final NodeSchema NODEGROUP_SCHEMA =
+      new NodeSchema.Builder().setType(LayerType.INNER_NODE)
+          .setDefaultName(DEFAULT_NODEGROUP).build();
+
+  public static final NodeSchema LEAF_SCHEMA =
+      new NodeSchema.Builder().setType(LayerType.LEAF_NODE).build();
+}
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/NetUtils.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/NetUtils.java
new file mode 100644
index 0000000..501a9ea
--- /dev/null
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/NetUtils.java
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.net;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * Utility class to facilitate network topology functions.
+ */
+public final class NetUtils {
+  public static final Logger LOG = LoggerFactory.getLogger(NetUtils.class);
+  private NetUtils() {
+    // Prevent instantiation
+  }
+  /**
+   * Normalize a path by stripping off any trailing.
+   * {@link NetConstants#PATH_SEPARATOR}
+   * @param path path to normalize.
+   * @return the normalised path
+   * If <i>path</i>is empty or null, then {@link NetConstants#ROOT} is returned
+   */
+  public static String normalize(String path) {
+    if (path == null || path.length() == 0) {
+      return NetConstants.ROOT;
+    }
+
+    if (path.charAt(0) != NetConstants.PATH_SEPARATOR) {
+      throw new IllegalArgumentException(
+          "Network Location path does not start with "
+              + NetConstants.PATH_SEPARATOR_STR + ": " + path);
+    }
+
+    // Remove any trailing NetConstants.PATH_SEPARATOR
+    return path.length() == 1 ? path :
+        path.replaceAll(NetConstants.PATH_SEPARATOR_STR + "+$", "");
+  }
+
+  /**
+   *  Given a network topology location string, return its network topology
+   *  depth, E.g. the depth of /dc1/rack1/ng1/node1 is 5.
+   */
+  public static int locationToDepth(String location) {
+    String newLocation = normalize(location);
+    return newLocation.equals(NetConstants.PATH_SEPARATOR_STR) ? 1 :
+        newLocation.split(NetConstants.PATH_SEPARATOR_STR).length;
+  }
+
+
+  /**
+   *  Remove node from mutableExcludedNodes if it's covered by excludedScope.
+   *  Please noted that mutableExcludedNodes content might be changed after the
+   *  function call.
+   * @return the new excludedScope
+   */
+  public static String removeDuplicate(NetworkTopology topology,
+      Collection<Node> mutableExcludedNodes, String excludedScope,
+      int ancestorGen) {
+    if (mutableExcludedNodes == null || mutableExcludedNodes.size() == 0 ||
+        excludedScope == null || topology == null) {
+      return excludedScope;
+    }
+
+    Iterator<Node> iterator = mutableExcludedNodes.iterator();
+    while (iterator.hasNext()) {
+      Node node = iterator.next();
+      Node ancestor = topology.getAncestor(node, ancestorGen);
+      if (ancestor == null) {
+        LOG.warn("Fail to get ancestor generation " + ancestorGen +
+            " of node :" + node);
+        continue;
+      }
+      if (excludedScope.startsWith(ancestor.getNetworkFullPath())) {
+        // reset excludedScope if it's covered by exclude node's ancestor
+        return null;
+      }
+      if (ancestor.getNetworkFullPath().startsWith(excludedScope)) {
+        // remove exclude node if it's covered by excludedScope
+        iterator.remove();
+      }
+    }
+    return excludedScope;
+  }
+
+  /**
+   *  Remove node from mutableExcludedNodes if it's not part of scope
+   *  Please noted that mutableExcludedNodes content might be changed after the
+   *  function call.
+   */
+  public static void removeOutscope(Collection<Node> mutableExcludedNodes,
+      String scope) {
+    if (mutableExcludedNodes == null || scope == null) {
+      return;
+    }
+    synchronized (mutableExcludedNodes) {
+      Iterator<Node> iterator = mutableExcludedNodes.iterator();
+      while (iterator.hasNext()) {
+        Node next = iterator.next();
+        if (!next.getNetworkFullPath().startsWith(scope)) {
+          iterator.remove();
+        }
+      }
+    }
+  }
+
+  /**
+   * Get a ancestor list for nodes on generation <i>generation</i>.
+   *
+   * @param nodes a collection of leaf nodes
+   * @param generation  the ancestor generation
+   * @return the ancestor list. If no ancestor is found, then a empty list is
+   * returned.
+   */
+  public static List<Node> getAncestorList(NetworkTopology topology,
+      Collection<Node> nodes, int generation) {
+    List<Node> ancestorList = new ArrayList<>();
+    if (topology == null ||nodes == null || nodes.size() == 0 ||
+        generation == 0) {
+      return ancestorList;
+    }
+    Iterator<Node> iterator = nodes.iterator();
+    while (iterator.hasNext()) {
+      Node node = iterator.next();
+      Node ancestor = topology.getAncestor(node, generation);
+      if (ancestor == null) {
+        LOG.warn("Fail to get ancestor generation " + generation +
+            " of node :" + node);
+        continue;
+      }
+      if (!ancestorList.contains(ancestor)) {
+        ancestorList.add(ancestor);
+      }
+    }
+    return ancestorList;
+  }
+}
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/NetworkTopology.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/NetworkTopology.java
new file mode 100644
index 0000000..a3d3680
--- /dev/null
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/NetworkTopology.java
@@ -0,0 +1,250 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.net;
+
+import java.util.Collection;
+
+/**
+ * The interface defines a network topology.
+ */
+public interface NetworkTopology {
+  /** Exception for invalid network topology detection. */
+  class InvalidTopologyException extends RuntimeException {
+    private static final long serialVersionUID = 1L;
+    public InvalidTopologyException(String msg) {
+      super(msg);
+    }
+  }
+  /**
+   * Add a leaf node. This will be called when a new datanode is added.
+   * @param node node to be added; can be null
+   * @exception IllegalArgumentException if add a node to a leave or node to be
+   * added is not a leaf
+   */
+  void add(Node node);
+
+
+  /**
+   * Remove a node from the network topology. This will be called when a
+   * existing datanode is removed from the system.
+   * @param node node to be removed; cannot be null
+   */
+  void remove(Node node);
+
+
+  /**
+   * Check if the tree already contains node <i>node</i>.
+   * @param node a node
+   * @return true if <i>node</i> is already in the tree; false otherwise
+   */
+  boolean contains(Node node);
+
+  /**
+   * Compare the direct parent of each node for equality.
+   * @return true if their parent are the same
+   */
+  boolean isSameParent(Node node1, Node node2);
+
+  /**
+   * Compare the specified ancestor generation of each node for equality.
+   * ancestorGen 1 means parent.
+   * @return true if their specified generation ancestor are equal
+   */
+  boolean isSameAncestor(Node node1, Node node2, int ancestorGen);
+
+
+  /**
+   * Get the ancestor for node on generation <i>ancestorGen</i>.
+   *
+   * @param node the node to get ancestor
+   * @param ancestorGen  the ancestor generation
+   * @return the ancestor. If no ancestor is found, then null is returned.
+   */
+  Node getAncestor(Node node, int ancestorGen);
+
+  /**
+   * Return the max level of this topology, start from 1 for ROOT. For example,
+   * topology like "/rack/node" has the max level '3'.
+   */
+  int getMaxLevel();
+
+  /**
+   * Given a string representation of a node, return its reference.
+   * @param loc a path string representing a node, can be leaf or inner node
+   * @return a reference to the node; null if the node is not in the tree
+   */
+  Node getNode(String loc);
+
+  /**
+   * Given a string representation of a InnerNode, return its leaf nodes count.
+   * @param loc a path-like string representation of a InnerNode
+   * @return the number of leaf nodes, 0 if it's not an InnerNode or the node
+   * doesn't exist
+   */
+  int getNumOfLeafNode(String loc);
+
+  /**
+   * Return the node numbers at level <i>level</i>.
+   * @param level topology level, start from 1, which means ROOT
+   * @return the number of nodes on the level
+   */
+  int getNumOfNodes(int level);
+
+  /**
+   * Randomly choose a node in the scope.
+   * @param scope range of nodes from which a node will be chosen. If scope
+   *              starts with ~, choose one from the all nodes except for the
+   *              ones in <i>scope</i>; otherwise, choose one from <i>scope</i>.
+   * @return the chosen node
+   */
+  Node chooseRandom(String scope);
+
+  /**
+   * Randomly choose a node in the scope, ano not in the exclude scope.
+   * @param scope range of nodes from which a node will be chosen. cannot start
+   *              with ~
+   * @param excludedScope the chosen node cannot be in this range. cannot
+   *                      starts with ~
+   * @return the chosen node
+   */
+  Node chooseRandom(String scope, String excludedScope);
+
+  /**
+   * Randomly choose a leaf node from <i>scope</i>.
+   *
+   * If scope starts with ~, choose one from the all nodes except for the
+   * ones in <i>scope</i>; otherwise, choose nodes from <i>scope</i>.
+   * If excludedNodes is given, choose a node that's not in excludedNodes.
+   *
+   * @param scope range of nodes from which a node will be chosen
+   * @param excludedNodes nodes to be excluded
+   *
+   * @return the chosen node
+   */
+  Node chooseRandom(String scope, Collection<Node> excludedNodes);
+
+  /**
+   * Randomly choose a leaf node from <i>scope</i>.
+   *
+   * If scope starts with ~, choose one from the all nodes except for the
+   * ones in <i>scope</i>; otherwise, choose nodes from <i>scope</i>.
+   * If excludedNodes is given, choose a node that's not in excludedNodes.
+   *
+   * @param scope range of nodes from which a node will be chosen
+   * @param excludedNodes nodes to be excluded from.
+   * @param ancestorGen matters when excludeNodes is not null. It means the
+   * ancestor generation that's not allowed to share between chosen node and the
+   * excludedNodes. For example, if ancestorGen is 1, means chosen node
+   * cannot share the same parent with excludeNodes. If value is 2, cannot
+   * share the same grand parent, and so on. If ancestorGen is 0, then no
+   * effect.
+   *
+   * @return the chosen node
+   */
+  Node chooseRandom(String scope, Collection<Node> excludedNodes,
+      int ancestorGen);
+
+
+  /**
+   * Randomly choose a leaf node.
+   *
+   * @param scope range from which a node will be chosen, cannot start with ~
+   * @param excludedNodes nodes to be excluded
+   * @param excludedScope excluded node range. Cannot start with ~
+   * @param ancestorGen matters when excludeNodes is not null. It means the
+   * ancestor generation that's not allowed to share between chosen node and the
+   * excludedNodes. For example, if ancestorGen is 1, means chosen node
+   * cannot share the same parent with excludeNodes. If value is 2, cannot
+   * share the same grand parent, and so on. If ancestorGen is 0, then no
+   * effect.
+   *
+   * @return the chosen node
+   */
+  Node chooseRandom(String scope, String excludedScope,
+      Collection<Node> excludedNodes, int ancestorGen);
+
+
+  /**
+   * Randomly choose one node from <i>scope</i>, share the same generation
+   * ancestor with <i>affinityNode</i>, and exclude nodes in
+   * <i>excludeScope</i> and <i>excludeNodes</i>.
+   *
+   * @param scope range of nodes from which a node will be chosen, cannot start
+   *              with ~
+   * @param excludedScope range of nodes to be excluded, cannot start with ~
+   * @param excludedNodes nodes to be excluded
+   * @param affinityNode  when not null, the chosen node should share the same
+   *                     ancestor with this node at generation ancestorGen.
+   *                      Ignored when value is null
+   * @param ancestorGen If 0, then no same generation ancestor enforcement on
+   *                     both excludedNodes and affinityNode. If greater than 0,
+   *                     then apply to affinityNode(if not null), or apply to
+   *                     excludedNodes if affinityNode is null
+   * @return the chosen node
+   */
+  Node chooseRandom(String scope, String excludedScope,
+      Collection<Node> excludedNodes, Node affinityNode, int ancestorGen);
+
+  /**
+   * Choose the node at index <i>index</i> from <i>scope</i>, share the same
+   * generation ancestor with <i>affinityNode</i>, and exclude nodes in
+   * <i>excludeScope</i> and <i>excludeNodes</i>.
+   *
+   * @param leafIndex node index, exclude nodes in excludedScope and
+   *                  excludedNodes
+   * @param scope range of nodes from which a node will be chosen, cannot start
+   *              with ~
+   * @param excludedScope range of nodes to be excluded, cannot start with ~
+   * @param excludedNodes nodes to be excluded
+   * @param affinityNode  when not null, the chosen node should share the same
+   *                     ancestor with this node at generation ancestorGen.
+   *                      Ignored when value is null
+   * @param ancestorGen If 0, then no same generation ancestor enforcement on
+   *                     both excludedNodes and affinityNode. If greater than 0,
+   *                     then apply to affinityNode(if not null), or apply to
+   *                     excludedNodes if affinityNode is null
+   * @return the chosen node
+   */
+  Node getNode(int leafIndex, String scope, String excludedScope,
+      Collection<Node> excludedNodes, Node affinityNode, int ancestorGen);
+
+  /** Return the distance cost between two nodes
+   * The distance cost from one node to its parent is it's parent's cost
+   * The distance cost between two nodes is calculated by summing up their
+   * distances cost to their closest common ancestor.
+   * @param node1 one node
+   * @param node2 another node
+   * @return the distance cost between node1 and node2 which is zero if they
+   * are the same or {@link Integer#MAX_VALUE} if node1 or node2 do not belong
+   * to the cluster
+   */
+  int getDistanceCost(Node node1, Node node2);
+
+  /**
+   * Sort nodes array by network distance to <i>reader</i> to reduces network
+   * traffic and improves performance.
+   *
+   * As an additional twist, we also randomize the nodes at each network
+   * distance. This helps with load balancing when there is data skew.
+   *
+   * @param reader    Node where need the data
+   * @param nodes     Available replicas with the requested data
+   * @param activeLen Number of active nodes at the front of the array
+   */
+  void sortByDistanceCost(Node reader, Node[] nodes, int activeLen);
+}
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/NetworkTopologyImpl.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/NetworkTopologyImpl.java
new file mode 100644
index 0000000..d0b295f
--- /dev/null
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/NetworkTopologyImpl.java
@@ -0,0 +1,778 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.net;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.TreeMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.conf.Configuration;
+import static org.apache.hadoop.hdds.scm.net.NetConstants.ROOT;
+import static org.apache.hadoop.hdds.scm.net.NetConstants.SCOPE_REVERSE_STR;
+import static org.apache.hadoop.hdds.scm.net.NetConstants.ANCESTOR_GENERATION_DEFAULT;
+
+/**
+ * The class represents a cluster of computers with a tree hierarchical
+ * network topology. In the network topology, leaves represent data nodes
+ * (computers) and inner nodes represent datacenter/core-switches/routers that
+ * manages traffic in/out of data centers or racks.
+ */
+public class NetworkTopologyImpl implements NetworkTopology{
+  public static final Logger LOG =
+      LoggerFactory.getLogger(NetworkTopology.class);
+
+  /** The Inner node crate factory. */
+  private final InnerNode.Factory factory;
+  /** The root cluster tree. */
+  private final InnerNode clusterTree;
+  /** Depth of all leaf nodes. */
+  private final int maxLevel;
+  /** Schema manager. */
+  private final NodeSchemaManager schemaManager;
+  /** Lock to coordinate cluster tree access. */
+  private ReadWriteLock netlock = new ReentrantReadWriteLock(true);
+
+  public NetworkTopologyImpl(Configuration conf) {
+    schemaManager = NodeSchemaManager.getInstance();
+    schemaManager.init(conf);
+    maxLevel = schemaManager.getMaxLevel();
+    factory = InnerNodeImpl.FACTORY;
+    clusterTree = factory.newInnerNode(ROOT, null, null,
+        NetConstants.ROOT_LEVEL,
+        schemaManager.getCost(NetConstants.ROOT_LEVEL));
+  }
+
+  @VisibleForTesting
+  public NetworkTopologyImpl(NodeSchemaManager manager) {
+    schemaManager = manager;
+    maxLevel = schemaManager.getMaxLevel();
+    factory = InnerNodeImpl.FACTORY;
+    clusterTree = factory.newInnerNode(ROOT, null, null,
+        NetConstants.ROOT_LEVEL,
+        schemaManager.getCost(NetConstants.ROOT_LEVEL));
+  }
+
+  /**
+   * Add a leaf node. This will be called when a new datanode is added.
+   * @param node node to be added; can be null
+   * @exception IllegalArgumentException if add a node to a leave or node to be
+   * added is not a leaf
+   */
+  public void add(Node node) {
+    Preconditions.checkArgument(node != null, "node cannot be null");
+    if (node instanceof InnerNode) {
+      throw new IllegalArgumentException(
+          "Not allowed to add an inner node: "+ node.getNetworkFullPath());
+    }
+    int newDepth = NetUtils.locationToDepth(node.getNetworkLocation()) + 1;
+
+    // Check depth
+    if (maxLevel != newDepth) {
+      throw new InvalidTopologyException("Failed to add " +
+          node.getNetworkFullPath() + ": Its path depth is not " + maxLevel);
+    }
+    netlock.writeLock().lock();
+    boolean add;
+    try {
+      add = clusterTree.add(node);
+    }finally {
+      netlock.writeLock().unlock();
+    }
+
+    if (add) {
+      LOG.info("Added a new node: " + node.getNetworkFullPath());
+      LOG.debug("NetworkTopology became:\n{}", this);
+    }
+  }
+
+  /**
+   * Remove a node from the network topology. This will be called when a
+   * existing datanode is removed from the system.
+   * @param node node to be removed; cannot be null
+   */
+  public void remove(Node node) {
+    Preconditions.checkArgument(node != null, "node cannot be null");
+    if (node instanceof InnerNode) {
+      throw new IllegalArgumentException(
+          "Not allowed to remove an inner node: "+ node.getNetworkFullPath());
+    }
+    netlock.writeLock().lock();
+    try {
+      clusterTree.remove(node);
+    }finally {
+      netlock.writeLock().unlock();
+    }
+    LOG.info("Removed a node: " + node.getNetworkFullPath());
+    LOG.debug("NetworkTopology became:\n{}", this);
+  }
+
+  /**
+   * Check if the tree already contains node <i>node</i>.
+   * @param node a node
+   * @return true if <i>node</i> is already in the tree; false otherwise
+   */
+  public boolean contains(Node node) {
+    Preconditions.checkArgument(node != null, "node cannot be null");
+    netlock.readLock().lock();
+    try {
+      Node parent = node.getParent();
+      while (parent != null && parent != clusterTree) {
+        parent = parent.getParent();
+      }
+      if (parent == clusterTree) {
+        return true;
+      }
+    } finally {
+      netlock.readLock().unlock();
+    }
+    return false;
+  }
+
+  /**
+   * Compare the specified ancestor generation of each node for equality.
+   * @return true if their specified generation ancestor are equal
+   */
+  public boolean isSameAncestor(Node node1, Node node2, int ancestorGen) {
+    if (node1 == null || node2 == null || ancestorGen <= 0) {
+      return false;
+    }
+    netlock.readLock().lock();
+    try {
+      return node1.getAncestor(ancestorGen) == node2.getAncestor(ancestorGen);
+    } finally {
+      netlock.readLock().unlock();
+    }
+  }
+
+  /**
+   * Compare the direct parent of each node for equality.
+   * @return true if their parent are the same
+   */
+  public boolean isSameParent(Node node1, Node node2) {
+    if (node1 == null || node2 == null) {
+      return false;
+    }
+    netlock.readLock().lock();
+    try {
+      node1 = node1.getParent();
+      node2 = node2.getParent();
+      return node1 == node2;
+    } finally {
+      netlock.readLock().unlock();
+    }
+  }
+
+  /**
+   * Get the ancestor for node on generation <i>ancestorGen</i>.
+   *
+   * @param node the node to get ancestor
+   * @param ancestorGen  the ancestor generation
+   * @return the ancestor. If no ancestor is found, then null is returned.
+   */
+  public Node getAncestor(Node node, int ancestorGen) {
+    if (node == null) {
+      return null;
+    }
+    netlock.readLock().lock();
+    try {
+      return node.getAncestor(ancestorGen);
+    } finally {
+      netlock.readLock().unlock();
+    }
+  }
+
+  /**
+   * Given a string representation of a node(leaf or inner), return its
+   * reference.
+   * @param loc a path string representing a node, can be leaf or inner node
+   * @return a reference to the node, null if the node is not in the tree
+   */
+  public Node getNode(String loc) {
+    loc = NetUtils.normalize(loc);
+    netlock.readLock().lock();
+    try {
+      if (!ROOT.equals(loc)) {
+        return clusterTree.getNode(loc);
+      } else {
+        return clusterTree;
+      }
+    } finally {
+      netlock.readLock().unlock();
+    }
+  }
+
+  /**
+   * Given a string representation of Node, return its leaf nodes count.
+   * @param loc a path-like string representation of Node
+   * @return the number of leaf nodes for InnerNode, 1 for leaf node, 0 if node
+   * doesn't exist
+   */
+  public int getNumOfLeafNode(String loc) {
+    netlock.readLock().lock();
+    try {
+      Node node = getNode(loc);
+      if (node != null) {
+        return node.getNumOfLeaves();
+      }
+    } finally {
+      netlock.readLock().unlock();
+    }
+    return 0;
+  }
+
+  /**
+   * Return the max level of this tree, start from 1 for ROOT. For example,
+   * topology like "/rack/node" has the max level '3'.
+   */
+  public int getMaxLevel() {
+    return maxLevel;
+  }
+
+  /**
+   * Return the node numbers at level <i>level</i>.
+   * @param level topology level, start from 1, which means ROOT
+   * @return the number of nodes on the level
+   */
+  public int getNumOfNodes(int level) {
+    Preconditions.checkArgument(level > 0 && level <= maxLevel,
+        "Invalid level");
+    netlock.readLock().lock();
+    try {
+      return clusterTree.getNumOfNodes(level);
+    } finally {
+      netlock.readLock().unlock();
+    }
+  }
+
+  /**
+   * Randomly choose a node in the scope.
+   * @param scope range of nodes from which a node will be chosen. If scope
+   *              starts with ~, choose one from the all nodes except for the
+   *              ones in <i>scope</i>; otherwise, choose one from <i>scope</i>.
+   * @return the chosen node
+   */
+  public Node chooseRandom(String scope) {
+    if (scope == null) {
+      scope = ROOT;
+    }
+    if (scope.startsWith(SCOPE_REVERSE_STR)) {
+      return chooseRandom(ROOT, scope.substring(1), null, null,
+          ANCESTOR_GENERATION_DEFAULT);
+    } else {
+      return chooseRandom(scope, null, null, null, ANCESTOR_GENERATION_DEFAULT);
+    }
+  }
+
+  /**
+   * Randomly choose a node in the scope, ano not in the exclude scope.
+   * @param scope range of nodes from which a node will be chosen. cannot start
+   *              with ~
+   * @param excludedScope the chosen node cannot be in this range. cannot
+   *                      starts with ~
+   * @return the chosen node
+   */
+  public Node chooseRandom(String scope, String excludedScope) {
+    return chooseRandom(scope, excludedScope, null, null,
+        ANCESTOR_GENERATION_DEFAULT);
+  }
+
+  /**
+   * Randomly choose a leaf node from <i>scope</i>.
+   *
+   * If scope starts with ~, choose one from the all nodes except for the
+   * ones in <i>scope</i>; otherwise, choose nodes from <i>scope</i>.
+   * If excludedNodes is given, choose a node that's not in excludedNodes.
+   *
+   * @param scope range of nodes from which a node will be chosen
+   * @param excludedNodes nodes to be excluded
+   *
+   * @return the chosen node
+   */
+  public Node chooseRandom(String scope, Collection<Node> excludedNodes) {
+    if (scope == null) {
+      scope = ROOT;
+    }
+    if (scope.startsWith(SCOPE_REVERSE_STR)) {
+      return chooseRandom(ROOT, scope.substring(1), excludedNodes, null,
+          ANCESTOR_GENERATION_DEFAULT);
+    } else {
+      return chooseRandom(scope, null, excludedNodes, null,
+          ANCESTOR_GENERATION_DEFAULT);
+    }
+  }
+
+  /**
+   * Randomly choose a leaf node from <i>scope</i>.
+   *
+   * If scope starts with ~, choose one from the all nodes except for the
+   * ones in <i>scope</i>; otherwise, choose nodes from <i>scope</i>.
+   * If excludedNodes is given, choose a node that's not in excludedNodes.
+   *
+   * @param scope range of nodes from which a node will be chosen
+   * @param excludedNodes nodes to be excluded from.
+   * @param ancestorGen matters when excludeNodes is not null. It means the
+   * ancestor generation that's not allowed to share between chosen node and the
+   * excludedNodes. For example, if ancestorGen is 1, means chosen node
+   * cannot share the same parent with excludeNodes. If value is 2, cannot
+   * share the same grand parent, and so on. If ancestorGen is 0, then no
+   * effect.
+   *
+   * @return the chosen node
+   */
+  public Node chooseRandom(String scope, Collection<Node> excludedNodes,
+      int ancestorGen) {
+    if (scope == null) {
+      scope = ROOT;
+    }
+    if (scope.startsWith(SCOPE_REVERSE_STR)) {
+      return chooseRandom(ROOT, scope.substring(1), excludedNodes, null,
+          ancestorGen);
+    } else {
+      return chooseRandom(scope, null, excludedNodes, null, ancestorGen);
+    }
+  }
+
+  /**
+   * Randomly choose a leaf node.
+   *
+   * @param scope range from which a node will be chosen, cannot start with ~
+   * @param excludedNodes nodes to be excluded
+   * @param excludedScope excluded node range. Cannot start with ~
+   * @param ancestorGen matters when excludeNodes is not null. It means the
+   * ancestor generation that's not allowed to share between chosen node and the
+   * excludedNodes. For example, if ancestorGen is 1, means chosen node
+   * cannot share the same parent with excludeNodes. If value is 2, cannot
+   * share the same grand parent, and so on. If ancestorGen is 0, then no
+   * effect.
+   *
+   * @return the chosen node
+   */
+  public Node chooseRandom(String scope, String excludedScope,
+      Collection<Node> excludedNodes, int ancestorGen) {
+    return chooseRandom(scope, excludedScope, excludedNodes, null, ancestorGen);
+  }
+
+  /**
+   * Randomly choose one leaf node from <i>scope</i>, share the same generation
+   * ancestor with <i>affinityNode</i>, and exclude nodes in
+   * <i>excludeScope</i> and <i>excludeNodes</i>.
+   *
+   * @param scope range of nodes from which a node will be chosen, cannot start
+   *              with ~
+   * @param excludedScope range of nodes to be excluded, cannot start with ~
+   * @param excludedNodes nodes to be excluded
+   * @param affinityNode  when not null, the chosen node should share the same
+   *                     ancestor with this node at generation ancestorGen.
+   *                      Ignored when value is null
+   * @param ancestorGen If 0, then no same generation ancestor enforcement on
+   *                     both excludedNodes and affinityNode. If greater than 0,
+   *                     then apply to affinityNode(if not null), or apply to
+   *                     excludedNodes if affinityNode is null
+   * @return the chosen node
+   */
+  public Node chooseRandom(String scope, String excludedScope,
+      Collection<Node> excludedNodes, Node affinityNode, int ancestorGen) {
+    if (scope == null) {
+      scope = ROOT;
+    }
+
+    checkScope(scope);
+    checkExcludedScope(excludedScope);
+    checkAffinityNode(affinityNode);
+    checkAncestorGen(ancestorGen);
+
+    netlock.readLock().lock();
+    try {
+      return chooseNodeInternal(scope, -1, excludedScope,
+          excludedNodes, affinityNode, ancestorGen);
+    } finally {
+      netlock.readLock().unlock();
+    }
+  }
+
+  /**
+   * Choose the leaf node at index <i>index</i> from <i>scope</i>, share the
+   * same generation ancestor with <i>affinityNode</i>, and exclude nodes in
+   * <i>excludeScope</i> and <i>excludeNodes</i>.
+   *
+   * @param leafIndex node index, exclude nodes in excludedScope and
+   *                  excludedNodes
+   * @param scope range of nodes from which a node will be chosen, cannot start
+   *              with ~
+   * @param excludedScope range of nodes to be excluded, cannot start with ~
+   * @param excludedNodes nodes to be excluded
+   * @param affinityNode  when not null, the chosen node should share the same
+   *                     ancestor with this node at generation ancestorGen.
+   *                      Ignored when value is null
+   * @param ancestorGen If 0, then no same generation ancestor enforcement on
+   *                     both excludedNodes and affinityNode. If greater than 0,
+   *                     then apply to affinityNode(if not null), or apply to
+   *                     excludedNodes if affinityNode is null
+   * @return the chosen node
+   * Example:
+   *
+   *                                /  --- root
+   *                              /  \
+   *                             /    \
+   *                            /      \
+   *                           /        \
+   *                         dc1         dc2
+   *                        / \         / \
+   *                       /   \       /   \
+   *                      /     \     /     \
+   *                    rack1 rack2  rack1  rack2
+   *                   / \     / \  / \     / \
+   *                 n1  n2  n3 n4 n5  n6  n7 n8
+   *
+   *   Input:
+   *   leafIndex = 1
+   *   excludedScope = /dc2
+   *   excludedNodes = {/dc1/rack1/n1}
+   *   affinityNode = /dc1/rack2/n2
+   *   ancestorGen = 2
+   *
+   *   Output:
+   *   node /dc1/rack2/n4
+   *
+   *   Explanation:
+   *   With affinityNode n2 and ancestorGen 2, it means we can only pick node
+   *   from subtree /dc1. LeafIndex 1, so we pick the 2nd available node n4.
+   *
+   */
+  public Node getNode(int leafIndex, String scope, String excludedScope,
+      Collection<Node> excludedNodes, Node affinityNode, int ancestorGen) {
+    Preconditions.checkArgument(leafIndex >= 0);
+    if (scope == null) {
+      scope = ROOT;
+    }
+    checkScope(scope);
+    checkExcludedScope(excludedScope);
+    checkAffinityNode(affinityNode);
+    checkAncestorGen(ancestorGen);
+
+    netlock.readLock().lock();
+    try {
+      return chooseNodeInternal(scope, leafIndex, excludedScope,
+          excludedNodes, affinityNode, ancestorGen);
+    } finally {
+      netlock.readLock().unlock();
+    }
+  }
+
+  private Node chooseNodeInternal(String scope, int leafIndex,
+      String excludedScope, Collection<Node> excludedNodes, Node affinityNode,
+      int ancestorGen) {
+    Preconditions.checkArgument(scope != null);
+
+    String finalScope = scope;
+    if (affinityNode != null && ancestorGen > 0) {
+      Node affinityAncestor = affinityNode.getAncestor(ancestorGen);
+      if (affinityAncestor == null) {
+        throw new IllegalArgumentException("affinityNode " +
+            affinityNode.getNetworkFullPath() + " doesn't have ancestor on" +
+            " generation  " + ancestorGen);
+      }
+      // affinity ancestor should has overlap with scope
+      if (affinityAncestor.getNetworkFullPath().startsWith(scope)){
+        finalScope = affinityAncestor.getNetworkFullPath();
+      } else if (!scope.startsWith(affinityAncestor.getNetworkFullPath())) {
+        return null;
+      }
+      // reset ancestor generation since the new scope is identified now
+      ancestorGen = 0;
+    }
+
+    // check overlap of excludedScope and finalScope
+    if (excludedScope != null) {
+      // excludeScope covers finalScope
+      if (finalScope.startsWith(excludedScope)) {
+        return null;
+      }
+      // excludeScope and finalScope share nothing
+      if (!excludedScope.startsWith(finalScope)) {
+        excludedScope = null;
+      }
+    }
+
+    // clone excludedNodes before remove duplicate in it
+    Collection<Node> mutableExNodes = null;
+    if (excludedNodes != null) {
+      // Remove duplicate in excludedNodes
+      mutableExNodes =
+          excludedNodes.stream().distinct().collect(Collectors.toList());
+    }
+
+    // remove duplicate in mutableExNodes and excludedScope, given ancestorGen
+    excludedScope = NetUtils.removeDuplicate(this, mutableExNodes,
+        excludedScope, ancestorGen);
+
+    // calculate available node count
+    Node scopeNode = getNode(finalScope);
+    int availableNodes = getAvailableNodesCount(
+        scopeNode.getNetworkFullPath(), excludedScope, mutableExNodes,
+        ancestorGen);
+
+    if (availableNodes <= 0) {
+      LOG.warn("No available node in (scope=\"{}\" excludedScope=\"{}\" " +
+              "excludedNodes=\"{}\"  ancestorGen=\"{}\").",
+          scopeNode.getNetworkFullPath(), excludedScope, excludedNodes,
+          ancestorGen);
+      return null;
+    }
+    LOG.debug("Choosing random from \"{}\" available nodes on node \"{}\"," +
+            " scope=\"{}\", excludedScope=\"{}\", excludeNodes=\"{}\".",
+        availableNodes, scopeNode, scopeNode.getNetworkFullPath(),
+        excludedScope, excludedNodes);
+
+    // scope is a Leaf node
+    if (!(scopeNode instanceof InnerNode)) {
+      return scopeNode;
+    }
+
+    Node ret;
+    if (leafIndex >= 0) {
+      ret = ((InnerNode)scopeNode).getLeaf(leafIndex % availableNodes,
+          excludedScope, mutableExNodes, ancestorGen);
+    } else {
+      final int index = ThreadLocalRandom.current().nextInt(availableNodes);
+      ret = ((InnerNode)scopeNode).getLeaf(index, excludedScope, mutableExNodes,
+          ancestorGen);
+    }
+    LOG.debug("chooseRandom return {}", ret);
+    return ret;
+  }
+
+  /** Return the distance cost between two nodes
+   * The distance cost from one node to its parent is it's parent's cost
+   * The distance cost between two nodes is calculated by summing up their
+   * distances cost to their closest common ancestor.
+   * @param node1 one node
+   * @param node2 another node
+   * @return the distance cost between node1 and node2 which is zero if they
+   * are the same or {@link Integer#MAX_VALUE} if node1 or node2 do not belong
+   * to the cluster
+   */
+  public int getDistanceCost(Node node1, Node node2) {
+    if ((node1 != null && node2 != null && node1.equals(node2)) ||
+        (node1 == null && node2 == null))  {
+      return 0;
+    }
+    int cost = 0;
+    netlock.readLock().lock();
+    try {
+      if (node1 == null || node2 == null ||
+          (node1.getAncestor(maxLevel - 1) != clusterTree) ||
+          (node2.getAncestor(maxLevel - 1) != clusterTree)) {
+        LOG.warn("One of the nodes is a null pointer");
+        return Integer.MAX_VALUE;
+      }
+      int level1 = node1.getLevel();
+      int level2 = node2.getLevel();
+      if (level1 > maxLevel || level2 > maxLevel) {
+        return Integer.MAX_VALUE;
+      }
+      while(level1 > level2 && node1 != null) {
+        node1 = node1.getParent();
+        level1--;
+        cost += node1 == null? 0 : node1.getCost();
+      }
+      while(level2 > level1 && node2 != null) {
+        node2 = node2.getParent();
+        level2--;
+        cost += node2 == null? 0 : node2.getCost();
+      }
+      while(node1 != null && node2 != null && node1 != node2) {
+        node1 = node1.getParent();
+        node2 = node2.getParent();
+        cost += node1 == null? 0 : node1.getCost();
+        cost += node2 == null? 0 : node2.getCost();
+      }
+      return cost;
+    } finally {
+      netlock.readLock().unlock();
+    }
+  }
+
+  /**
+   * Sort nodes array by network distance to <i>reader</i> to reduces network
+   * traffic and improves performance.
+   *
+   * As an additional twist, we also randomize the nodes at each network
+   * distance. This helps with load balancing when there is data skew.
+   *
+   * @param reader    Node where need the data
+   * @param nodes     Available replicas with the requested data
+   * @param activeLen Number of active nodes at the front of the array
+   */
+  public void sortByDistanceCost(Node reader, Node[] nodes, int activeLen) {
+    /** Sort weights for the nodes array */
+    int[] costs = new int[activeLen];
+    for (int i = 0; i < activeLen; i++) {
+      costs[i] = getDistanceCost(reader, nodes[i]);
+    }
+    // Add cost/node pairs to a TreeMap to sort
+    TreeMap<Integer, List<Node>> tree = new TreeMap<Integer, List<Node>>();
+    for (int i = 0; i < activeLen; i++) {
+      int cost = costs[i];
+      Node node = nodes[i];
+      List<Node> list = tree.get(cost);
+      if (list == null) {
+        list = Lists.newArrayListWithExpectedSize(1);
+        tree.put(cost, list);
+      }
+      list.add(node);
+    }
+    int idx = 0;
+    for (List<Node> list: tree.values()) {
+      if (list != null) {
+        Collections.shuffle(list);
+        for (Node n: list) {
+          nodes[idx] = n;
+          idx++;
+        }
+      }
+    }
+    Preconditions.checkState(idx == activeLen, "Wrong number of nodes sorted!");
+  }
+
+  /**
+   * Return the number of leaves in <i>scope</i> but not in
+   * <i>excludedNodes</i> and <i>excludeScope</i>.
+   * @param scope the scope
+   * @param excludedScope excluded scope
+   * @param mutableExcludedNodes a list of excluded nodes, content might be
+   *                            changed after the call
+   * @param ancestorGen same generation ancestor prohibit on excludedNodes
+   * @return number of available nodes
+   */
+  private int getAvailableNodesCount(String scope, String excludedScope,
+      Collection<Node> mutableExcludedNodes, int ancestorGen) {
+    Preconditions.checkArgument(scope != null);
+
+    Node scopeNode = getNode(scope);
+    if (scopeNode == null) {
+      return 0;
+    }
+    NetUtils.removeOutscope(mutableExcludedNodes, scope);
+    List<Node> excludedAncestorList =
+        NetUtils.getAncestorList(this, mutableExcludedNodes, ancestorGen);
+    for (Node ancestor : excludedAncestorList) {
+      if (scope.startsWith(ancestor.getNetworkFullPath())){
+        return 0;
+      }
+    }
+    // number of nodes to exclude
+    int excludedCount = 0;
+    if (excludedScope != null) {
+      Node excludedScopeNode = getNode(excludedScope);
+      if (excludedScopeNode != null) {
+        if (excludedScope.startsWith(scope)) {
+          excludedCount += excludedScopeNode.getNumOfLeaves();
+        } else if (scope.startsWith(excludedScope)) {
+          return 0;
+        }
+      }
+    }
+    // excludedNodes is not null case
+    if (mutableExcludedNodes != null && (!mutableExcludedNodes.isEmpty())) {
+      if (ancestorGen == 0) {
+        for (Node node: mutableExcludedNodes) {
+          if (contains(node)) {
+            excludedCount++;
+          }
+        }
+      } else {
+        for (Node ancestor : excludedAncestorList) {
+          if (ancestor.getNetworkFullPath().startsWith(scope)) {
+            excludedCount += ancestor.getNumOfLeaves();
+          }
+        }
+      }
+    }
+
+    int availableCount = scopeNode.getNumOfLeaves() - excludedCount;
+    Preconditions.checkState(availableCount >= 0);
+    return availableCount;
+  }
+
+  @Override
+  public String toString() {
+    // print max level
+    StringBuilder tree = new StringBuilder();
+    tree.append("Level: ");
+    tree.append(maxLevel);
+    tree.append("\n");
+    netlock.readLock().lock();
+    try {
+      // print the number of leaves
+      int numOfLeaves = clusterTree.getNumOfLeaves();
+      tree.append("Expected number of leaves:");
+      tree.append(numOfLeaves);
+      tree.append("\n");
+      // print all nodes
+      for (int i = 0; i < numOfLeaves; i++) {
+        tree.append(clusterTree.getLeaf(i).getNetworkFullPath());
+        tree.append("\n");
+      }
+    } finally {
+      netlock.readLock().unlock();
+    }
+    return tree.toString();
+  }
+
+  private void checkScope(String scope) {
+    if (scope != null && scope.startsWith(SCOPE_REVERSE_STR)) {
+      throw new IllegalArgumentException("scope " + scope +
+          " should not start with " + SCOPE_REVERSE_STR);
+    }
+  }
+
+  private void checkExcludedScope(String excludedScope) {
+    if (excludedScope != null &&
+        (excludedScope.startsWith(SCOPE_REVERSE_STR))) {
+      throw new IllegalArgumentException("excludedScope " + excludedScope +
+          " cannot start with " + SCOPE_REVERSE_STR);
+    }
+  }
+
+  private void checkAffinityNode(Node affinityNode) {
+    if (affinityNode != null && (!contains(affinityNode))) {
+      throw new IllegalArgumentException("Affinity node " +
+          affinityNode.getNetworkFullPath() + " is not a member of topology");
+    }
+  }
+
+  private void checkAncestorGen(int ancestorGen) {
+    if (ancestorGen > (maxLevel - 1) || ancestorGen < 0) {
+      throw new IllegalArgumentException("ancestorGen " + ancestorGen +
+          " exceeds this network topology acceptable level [0, " +
+          (maxLevel - 1) + "]");
+    }
+  }
+}
\ No newline at end of file
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/Node.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/Node.java
new file mode 100644
index 0000000..310b336
--- /dev/null
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/Node.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.net;
+
+/**
+ * The interface defines a node in a network topology.
+ * A node may be a leave representing a data node or an inner
+ * node representing a data center or rack.
+ * Each node has a name and its location in the network is
+ * decided by a string with syntax similar to a file name.
+ * For example, a data node's name is hostname:port# and if it's located at
+ * rack "orange" in data center "dog", the string representation of its
+ * network location will be /dog/orange.
+ */
+public interface Node {
+  /** @return the string representation of this node's network location path,
+   *  exclude itself. In another words, its parent's full network location */
+  String getNetworkLocation();
+
+  /** @return this node's self name in network topology. This should be node's
+   * IP or hostname.
+   * */
+  String getNetworkName();
+
+  /** @return this node's full path in network topology. It's the concatenation
+   *  of location and name.
+   * */
+  String getNetworkFullPath();
+
+  /** @return this node's parent */
+  InnerNode getParent();
+
+  /**
+   * Set this node's parent.
+   * @param parent the parent
+   */
+  void setParent(InnerNode parent);
+
+  /** @return this node's ancestor, generation 0 is itself, generation 1 is
+   *  node's parent, and so on.*/
+  Node getAncestor(int generation);
+
+  /**
+   * @return this node's level in the tree.
+   * E.g. the root of a tree returns 1 and root's children return 2
+   */
+  int getLevel();
+
+  /**
+   * Set this node's level in the tree.
+   * @param i the level
+   */
+  void setLevel(int i);
+
+  /**
+   * @return this node's cost when network traffic go through it.
+   * E.g. the cost of going cross a switch is 1, and cost of going through a
+   * datacenter can be 5.
+   * Be default the cost of leaf datanode is 0, all other node is 1.
+   */
+  int getCost();
+
+  /** @return the leaf nodes number under this node. */
+  int getNumOfLeaves();
+
+  /**
+   * Judge if this node is an ancestor of node <i>n</i>.
+   * Ancestor includes itself and parents case.
+   *
+   * @param n a node
+   * @return true if this node is an ancestor of <i>n</i>
+   */
+  boolean isAncestor(Node n);
+}
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/NodeImpl.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/NodeImpl.java
new file mode 100644
index 0000000..a9763b9
--- /dev/null
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/NodeImpl.java
@@ -0,0 +1,200 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.net;
+
+import com.google.common.base.Preconditions;
+
+import static org.apache.hadoop.hdds.scm.net.NetConstants.ROOT;
+import static org.apache.hadoop.hdds.scm.net.NetConstants.PATH_SEPARATOR_STR;
+
+/**
+ * A thread safe class that implements interface Node.
+ */
+public class NodeImpl implements Node {
+  // host:port#
+  private final String name;
+  // string representation of this node's location, such as /dc1/rack1
+  private final String location;
+  // location + "/" + name
+  private final String path;
+  // which level of the tree the node resides, start from 1 for root
+  private int level;
+  // node's parent
+  private InnerNode parent;
+  // the cost to go through this node
+  private final int cost;
+
+  /**
+   * Construct a node from its name and its location.
+   * @param name this node's name (can be null, must not contain
+   * {@link NetConstants#PATH_SEPARATOR})
+   * @param location this node's location
+   */
+  public NodeImpl(String name, String location, int cost) {
+    if (name != null && name.contains(PATH_SEPARATOR_STR)) {
+      throw new IllegalArgumentException(
+          "Network location name:" + name + " should not contain " +
+              PATH_SEPARATOR_STR);
+    }
+    this.name = (name == null) ? ROOT : name;
+    this.location = NetUtils.normalize(location);
+    this.path = this.location.equals(PATH_SEPARATOR_STR) ?
+        this.location + this.name :
+        this.location + PATH_SEPARATOR_STR + this.name;
+
+    this.cost = cost;
+  }
+
+  /**
+   * Construct a node from its name and its location.
+   *
+   * @param name     this node's name (can be null, must not contain
+   *                 {@link NetConstants#PATH_SEPARATOR})
+   * @param location this node's location
+   * @param parent   this node's parent node
+   * @param level    this node's level in the tree
+   * @param cost     this node's cost if traffic goes through it
+   */
+  public NodeImpl(String name, String location, InnerNode parent, int level,
+      int cost) {
+    this(name, location, cost);
+    this.parent = parent;
+    this.level = level;
+  }
+
+  /**
+   * @return this node's name
+   */
+  public String getNetworkName() {
+    return name;
+  }
+
+  /**
+   * @return this node's network location
+   */
+  public String getNetworkLocation() {
+    return location;
+  }
+
+  /**
+   * @return this node's full path in network topology. It's the concatenation
+   * of location and name.
+   */
+  public String getNetworkFullPath() {
+    return path;
+  }
+
+  /**
+   * @return this node's parent
+   */
+  public InnerNode getParent() {
+    return parent;
+  }
+
+  /**
+   * @return this node's ancestor, generation 0 is itself, generation 1 is
+   * node's parent, and so on.
+   */
+  public Node getAncestor(int generation) {
+    Preconditions.checkArgument(generation >= 0);
+    Node current = this;
+    while (generation > 0 && current != null) {
+      current = current.getParent();
+      generation--;
+    }
+    return current;
+  }
+
+  /**
+   * Set this node's parent.
+   *
+   * @param parent the parent
+   */
+  public void setParent(InnerNode parent) {
+    this.parent = parent;
+  }
+
+  /**
+   * @return this node's level in the tree.
+   * E.g. the root of a tree returns 0 and its children return 1
+   */
+  public int getLevel() {
+    return this.level;
+  }
+
+  /**
+   * Set this node's level in the tree.
+   *
+   * @param level the level
+   */
+  public void setLevel(int level) {
+    this.level = level;
+  }
+
+  /**
+   * @return this node's cost when network traffic go through it.
+   * E.g. the cost of going cross a switch is 1, and cost of going through a
+   * datacenter is 5.
+   * Be default the cost of leaf datanode is 0, all other inner node is 1.
+   */
+  public int getCost() {
+    return this.cost;
+  }
+
+  /** @return the leaf nodes number under this node. */
+  public int getNumOfLeaves() {
+    return 1;
+  }
+
+  /**
+   * Check if this node is an ancestor of node <i>node</i>. Ancestor includes
+   * itself and parents case;
+   * @param node a node
+   * @return true if this node is an ancestor of <i>node</i>
+   */
+  public boolean isAncestor(Node node) {
+    return this.getNetworkFullPath().equals(PATH_SEPARATOR_STR) ||
+        node.getNetworkLocation().startsWith(this.getNetworkFullPath()) ||
+            node.getNetworkFullPath().equalsIgnoreCase(
+                this.getNetworkFullPath());
+  }
+
+  @Override
+  public boolean equals(Object to) {
+    if (to == null) {
+      return false;
+    }
+    if (this == to) {
+      return true;
+    }
+    return this.toString().equals(to.toString());
+  }
+
+  @Override
+  public int hashCode() {
+    return toString().hashCode();
+  }
+
+  /**
+   * @return this node's path as its string representation
+   */
+  @Override
+  public String toString() {
+    return getNetworkFullPath();
+  }
+}
\ No newline at end of file
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/NodeSchema.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/NodeSchema.java
new file mode 100644
index 0000000..8c289f7
--- /dev/null
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/NodeSchema.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.net;
+
+import org.apache.hadoop.HadoopIllegalArgumentException;
+
+/**
+ * Network topology schema to housekeeper relevant information.
+ */
+public final class NodeSchema {
+  /**
+   * Network topology layer type enum definition.
+   */
+  public enum LayerType{
+    ROOT("Root", NetConstants.INNER_NODE_COST_DEFAULT),
+    INNER_NODE("InnerNode", NetConstants.INNER_NODE_COST_DEFAULT),
+    LEAF_NODE("Leaf", NetConstants.NODE_COST_DEFAULT);
+
+    private final String description;
+    // default cost
+    private final int cost;
+
+    LayerType(String description, int cost) {
+      this.description = description;
+      this.cost = cost;
+    }
+
+    @Override
+    public String toString() {
+      return description;
+    }
+
+    public int getCost(){
+      return cost;
+    }
+    public static LayerType getType(String typeStr) {
+      for (LayerType type: LayerType.values()) {
+        if (typeStr.equalsIgnoreCase(type.toString())) {
+          return type;
+        }
+      }
+      return null;
+    }
+  }
+
+  // default cost
+  private final int cost;
+  // layer Type, mandatory property
+  private final LayerType type;
+  // default name, can be null or ""
+  private final String defaultName;
+  // layer prefix, can be null or ""
+  private final String prefix;
+
+  /**
+   * Builder for NodeSchema.
+   */
+  public static class Builder {
+    private int cost = -1;
+    private LayerType type;
+    private String defaultName;
+    private String prefix;
+
+    public Builder setCost(int nodeCost) {
+      this.cost = nodeCost;
+      return this;
+    }
+
+    public Builder setPrefix(String nodePrefix) {
+      this.prefix = nodePrefix;
+      return this;
+    }
+
+    public Builder setType(LayerType nodeType) {
+      this.type = nodeType;
+      return this;
+    }
+
+    public Builder setDefaultName(String nodeDefaultName) {
+      this.defaultName = nodeDefaultName;
+      return this;
+    }
+
+    public NodeSchema build() {
+      if (type == null) {
+        throw new HadoopIllegalArgumentException("Type is mandatory for a " +
+            "network topology node layer definition");
+      }
+      if (cost == -1) {
+        cost = type.getCost();
+      }
+      return new NodeSchema(type, cost, prefix, defaultName);
+    }
+  }
+
+  /**
+   * Constructor.
+   * @param type layer type
+   * @param cost layer's default cost
+   * @param prefix layer's prefix
+   * @param defaultName layer's default name is if specified
+   */
+  public NodeSchema(LayerType type, int cost, String prefix,
+      String defaultName) {
+    this.type = type;
+    this.cost = cost;
+    this.prefix = prefix;
+    this.defaultName = defaultName;
+  }
+
+  public boolean matchPrefix(String name) {
+    if (name == null || name.isEmpty() || prefix == null || prefix.isEmpty()) {
+      return false;
+    }
+    return name.trim().toLowerCase().startsWith(prefix.toLowerCase());
+  }
+
+  public LayerType getType() {
+    return this.type;
+  }
+
+  public String getPrefix() {
+    return this.prefix;
+  }
+
+  public String getDefaultName() {
+    return this.defaultName;
+  }
+
+  public int getCost() {
+    return this.cost;
+  }
+}
\ No newline at end of file
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/NodeSchemaLoader.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/NodeSchemaLoader.java
new file mode 100644
index 0000000..9125fb7
--- /dev/null
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/NodeSchemaLoader.java
@@ -0,0 +1,388 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.net;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+import org.w3c.dom.Node;
+import org.w3c.dom.NodeList;
+import org.w3c.dom.Text;
+import org.xml.sax.SAXException;
+
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.parsers.ParserConfigurationException;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import  org.apache.hadoop.hdds.scm.net.NodeSchema.LayerType;
+
+/**
+ * A Network topology layer schema loading tool that loads user defined network
+ * layer schema data from a XML configuration file.
+ */
+public final class NodeSchemaLoader {
+  private static final Logger LOG
+      = LoggerFactory.getLogger(NodeSchemaLoader.class);
+  private static final String CONFIGURATION_TAG = "configuration";
+  private static final String LAYOUT_VERSION_TAG = "layoutversion";
+  private static final String TOPOLOGY_TAG = "topology";
+  private static final String TOPOLOGY_PATH = "path";
+  private static final String TOPOLOGY_ENFORCE_PREFIX = "enforceprefix";
+  private static final String LAYERS_TAG = "layers";
+  private static final String LAYER_TAG = "layer";
+  private static final String LAYER_ID = "id";
+  private static final String LAYER_TYPE = "type";
+  private static final String LAYER_COST = "cost";
+  private static final String LAYER_PREFIX = "prefix";
+  private static final String LAYER_DEFAULT_NAME = "default";
+
+  private static final int LAYOUT_VERSION = 1;
+  private volatile static NodeSchemaLoader instance = null;
+  private NodeSchemaLoader() {}
+
+  public static NodeSchemaLoader getInstance() {
+    if (instance == null) {
+      instance = new NodeSchemaLoader();
+    }
+    return instance;
+  }
+
+  /**
+   * Class to house keep the result of parsing a network topology schema file.
+   */
+  public static class NodeSchemaLoadResult {
+    private List<NodeSchema> schemaList;
+    private boolean enforcePrefix;
+
+    NodeSchemaLoadResult(List<NodeSchema> schemaList, boolean enforcePrefix) {
+      this.schemaList = schemaList;
+      this.enforcePrefix = enforcePrefix;
+    }
+
+    public boolean isEnforePrefix() {
+      return enforcePrefix;
+    }
+
+    public List<NodeSchema> getSchemaList() {
+      return schemaList;
+    }
+  }
+
+  /**
+   * Load user defined network layer schemas from a XML configuration file.
+   * @param schemaFilePath path of schema file
+   * @return all valid node schemas defined in schema file
+   */
+  public NodeSchemaLoadResult loadSchemaFromFile(String schemaFilePath)
+      throws IllegalArgumentException {
+    try {
+      File schemaFile = new File(schemaFilePath);
+      if (!schemaFile.exists()) {
+        String msg = "Network topology layer schema file " + schemaFilePath +
+            " is not found.";
+        LOG.warn(msg);
+        throw new IllegalArgumentException(msg);
+      }
+      return loadSchema(schemaFile);
+    } catch (ParserConfigurationException | IOException | SAXException e) {
+      throw new IllegalArgumentException("Fail to load network topology node"
+          + " schema file: " + schemaFilePath + " , error:" + e.getMessage());
+    }
+  }
+
+  /**
+   * Load network topology layer schemas from a XML configuration file.
+   * @param schemaFile schema file
+   * @return all valid node schemas defined in schema file
+   * @throws ParserConfigurationException ParserConfigurationException happen
+   * @throws IOException no such schema file
+   * @throws SAXException xml file has some invalid elements
+   * @throws IllegalArgumentException xml file content is logically invalid
+   */
+  private NodeSchemaLoadResult loadSchema(File schemaFile) throws
+      ParserConfigurationException, SAXException, IOException {
+    LOG.info("Loading network topology layer schema file " + schemaFile);
+    // Read and parse the schema file.
+    DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance();
+    dbf.setIgnoringComments(true);
+    DocumentBuilder builder = dbf.newDocumentBuilder();
+    Document doc = builder.parse(schemaFile);
+    Element root = doc.getDocumentElement();
+
+    if (!CONFIGURATION_TAG.equals(root.getTagName())) {
+      throw new IllegalArgumentException("Bad network topology layer schema " +
+          "configuration file: top-level element not <" + CONFIGURATION_TAG +
+          ">");
+    }
+    NodeSchemaLoadResult schemaList;
+    if (root.getElementsByTagName(LAYOUT_VERSION_TAG).getLength() == 1) {
+      if (loadLayoutVersion(root) == LAYOUT_VERSION) {
+        if (root.getElementsByTagName(LAYERS_TAG).getLength() == 1) {
+          Map<String, NodeSchema> schemas = loadLayersSection(root);
+          if (root.getElementsByTagName(TOPOLOGY_TAG).getLength() == 1) {
+            schemaList = loadTopologySection(root, schemas);
+          } else {
+            throw new IllegalArgumentException("Bad network topology layer " +
+                "schema configuration file: no or multiple <" + TOPOLOGY_TAG +
+                "> element");
+          }
+        } else {
+          throw new IllegalArgumentException("Bad network topology layer schema"
+              + " configuration file: no or multiple <" + LAYERS_TAG +
+              ">element");
+        }
+      } else {
+        throw new IllegalArgumentException("The parse failed because of bad "
+            + LAYOUT_VERSION_TAG + " value, expected:" + LAYOUT_VERSION);
+      }
+    } else {
+      throw new IllegalArgumentException("Bad network topology layer schema " +
+          "configuration file: no or multiple <" + LAYOUT_VERSION_TAG +
+          "> elements");
+    }
+    return schemaList;
+  }
+
+  /**
+   * Load layoutVersion from root element in the XML configuration file.
+   * @param root root element
+   * @return layout version
+   */
+  private int loadLayoutVersion(Element root) {
+    int layoutVersion;
+    Text text = (Text) root.getElementsByTagName(LAYOUT_VERSION_TAG)
+        .item(0).getFirstChild();
+    if (text != null) {
+      String value = text.getData().trim();
+      try {
+        layoutVersion = Integer.parseInt(value);
+      } catch (NumberFormatException e) {
+        throw new IllegalArgumentException("Bad " + LAYOUT_VERSION_TAG +
+            " value " + value + " is found. It should be an integer.");
+      }
+    } else {
+      throw new IllegalArgumentException("Value of <" + LAYOUT_VERSION_TAG +
+          "> is null");
+    }
+    return layoutVersion;
+  }
+
+  /**
+   * Load layers from root element in the XML configuration file.
+   * @param root root element
+   * @return A map of node schemas with layer ID and layer schema
+   */
+  private Map<String, NodeSchema> loadLayersSection(Element root) {
+    NodeList elements = root.getElementsByTagName(LAYER_TAG);
+    Map<String, NodeSchema> schemas = new HashMap<String, NodeSchema>();
+    for (int i = 0; i < elements.getLength(); i++) {
+      Node node = elements.item(i);
+      if (node instanceof Element) {
+        Element element = (Element) node;
+        if (LAYER_TAG.equals(element.getTagName())) {
+          String layerId = element.getAttribute(LAYER_ID);
+          NodeSchema schema = parseLayerElement(element);
+          if (!schemas.containsValue(schema)) {
+            schemas.put(layerId, schema);
+          } else {
+            throw new IllegalArgumentException("Repetitive layer in network " +
+                "topology node schema configuration file: " + layerId);
+          }
+        } else {
+          throw new IllegalArgumentException("Bad element in network topology "
+              + "node schema configuration file: " + element.getTagName());
+        }
+      }
+    }
+
+    // Integrity check, only one ROOT and one LEAF is allowed
+    boolean foundRoot = false;
+    boolean foundLeaf = false;
+    for(NodeSchema schema: schemas.values()) {
+      if (schema.getType() == LayerType.ROOT) {
+        if (foundRoot) {
+          throw new IllegalArgumentException("Multiple ROOT layers are found" +
+              " in network topology schema configuration file");
+        } else {
+          foundRoot = true;
+        }
+      }
+      if (schema.getType() == LayerType.LEAF_NODE) {
+        if (foundLeaf) {
+          throw new IllegalArgumentException("Multiple LEAF layers are found" +
+              " in network topology schema configuration file");
+        } else {
+          foundLeaf = true;
+        }
+      }
+    }
+    if (!foundRoot) {
+      throw new IllegalArgumentException("No ROOT layer is found" +
+          " in network topology schema configuration file");
+    }
+    if (!foundLeaf) {
+      throw new IllegalArgumentException("No LEAF layer is found" +
+          " in network topology schema configuration file");
+    }
+    return schemas;
+  }
+
+  /**
+   * Load network topology from root element in the XML configuration file and
+   * sort node schemas according to the topology path.
+   * @param root root element
+   * @param schemas schema map
+   * @return all valid node schemas defined in schema file
+   */
+  private NodeSchemaLoadResult loadTopologySection(Element root,
+      Map<String, NodeSchema> schemas) {
+    NodeList elements = root.getElementsByTagName(TOPOLOGY_TAG)
+        .item(0).getChildNodes();
+    List<NodeSchema> schemaList = new ArrayList<NodeSchema>();
+    boolean enforecePrefix = false;
+    for (int i = 0; i < elements.getLength(); i++) {
+      Node node = elements.item(i);
+      if (node instanceof Element) {
+        Element element = (Element) node;
+        String tagName = element.getTagName();
+        // Get the nonnull text value.
+        Text text = (Text) element.getFirstChild();
+        String value;
+        if (text != null) {
+          value = text.getData().trim();
+          if (value.isEmpty()) {
+            // Element with empty value is ignored
+            continue;
+          }
+        } else {
+          throw new IllegalArgumentException("Value of <" + tagName
+              + "> is null");
+        }
+        if (TOPOLOGY_PATH.equals(tagName)) {
+          if(value.startsWith(NetConstants.PATH_SEPARATOR_STR)) {
+            value = value.substring(1, value.length());
+          }
+          String[] layerIDs = value.split(NetConstants.PATH_SEPARATOR_STR);
+          if (layerIDs == null || layerIDs.length != schemas.size()) {
+            throw new IllegalArgumentException("Topology path depth doesn't "
+                + "match layer element numbers");
+          }
+          for (int j = 0; j < layerIDs.length; j++) {
+            if (schemas.get(layerIDs[j]) == null) {
+              throw new IllegalArgumentException("No layer found for id " +
+                  layerIDs[j]);
+            }
+          }
+          if (schemas.get(layerIDs[0]).getType() != LayerType.ROOT) {
+            throw new IllegalArgumentException("Topology path doesn't start "
+                + "with ROOT layer");
+          }
+          if (schemas.get(layerIDs[layerIDs.length -1]).getType() !=
+              LayerType.LEAF_NODE) {
+            throw new IllegalArgumentException("Topology path doesn't end "
+                + "with LEAF layer");
+          }
+          for (int j = 0; j < layerIDs.length; j++) {
+            schemaList.add(schemas.get(layerIDs[j]));
+          }
+        } else if (TOPOLOGY_ENFORCE_PREFIX.equalsIgnoreCase(tagName)) {
+          enforecePrefix = Boolean.parseBoolean(value);
+        } else {
+          throw new IllegalArgumentException("Unsupported Element <" +
+              tagName + ">");
+        }
+      }
+    }
+    // Integrity check
+    if (enforecePrefix) {
+      // Every InnerNode should have prefix defined
+      for (NodeSchema schema: schemas.values()) {
+        if (schema.getType() == LayerType.INNER_NODE &&
+            schema.getPrefix() == null) {
+          throw new IllegalArgumentException("There is layer without prefix " +
+              "defined while prefix is enforced.");
+        }
+      }
+    }
+    return new NodeSchemaLoadResult(schemaList, enforecePrefix);
+  }
+
+  /**
+   * Load a layer from a layer element in the XML configuration file.
+   * @param element network topology node layer element
+   * @return ECSchema
+   */
+  private NodeSchema parseLayerElement(Element element) {
+    NodeList fields = element.getChildNodes();
+    LayerType type = null;
+    int cost = 0;
+    String prefix = null;
+    String defaultName = null;
+    for (int i = 0; i < fields.getLength(); i++) {
+      Node fieldNode = fields.item(i);
+      if (fieldNode instanceof Element) {
+        Element field = (Element) fieldNode;
+        String tagName = field.getTagName();
+        // Get the nonnull text value.
+        Text text = (Text) field.getFirstChild();
+        String value;
+        if (text != null) {
+          value = text.getData().trim();
+          if (value.isEmpty()) {
+            // Element with empty value is ignored
+            continue;
+          }
+        } else {
+          continue;
+        }
+        if (LAYER_COST.equalsIgnoreCase(tagName)) {
+          cost = Integer.parseInt(value);
+          if (cost < 0) {
+            throw new IllegalArgumentException(
+                "Cost should be positive number or 0");
+          }
+        } else if (LAYER_TYPE.equalsIgnoreCase(tagName)) {
+          type = NodeSchema.LayerType.getType(value);
+          if (type == null) {
+            throw new IllegalArgumentException(
+                "Unsupported layer type:" + value);
+          }
+        } else if (LAYER_PREFIX.equalsIgnoreCase(tagName)) {
+          prefix = value;
+        } else if (LAYER_DEFAULT_NAME.equalsIgnoreCase(tagName)) {
+          defaultName = value;
+        } else {
+          throw new IllegalArgumentException("Unsupported Element <" + tagName
+              + ">");
+        }
+      }
+    }
+    // type is a mandatory property
+    if (type == null) {
+      throw new IllegalArgumentException("Missing type Element");
+    }
+    return new NodeSchema(type, cost, prefix, defaultName);
+  }
+}
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/NodeSchemaManager.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/NodeSchemaManager.java
new file mode 100644
index 0000000..8f2fac7
--- /dev/null
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/NodeSchemaManager.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.net;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.net.NodeSchemaLoader.NodeSchemaLoadResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/** The class manages all network topology schemas. */
+
+public final class NodeSchemaManager {
+  private static final Logger LOG = LoggerFactory.getLogger(
+      NodeSchemaManager.class);
+
+  // All schema saved and sorted from ROOT to LEAF node
+  private List<NodeSchema> allSchema;
+  // enforcePrefix only applies to INNER_NODE
+  private boolean enforcePrefix;
+  // max level, includes ROOT level
+  private int maxLevel = -1;
+
+  private volatile static NodeSchemaManager instance = null;
+
+  private NodeSchemaManager() {
+  }
+
+  public static NodeSchemaManager getInstance() {
+    if (instance == null) {
+      instance = new NodeSchemaManager();
+    }
+    return instance;
+  }
+
+  public void init(Configuration conf) {
+    /**
+     * Load schemas from network topology schema configuration file
+     */
+    String schemaFile = conf.get(
+        ScmConfigKeys.OZONE_SCM_NETWORK_TOPOLOGY_SCHEMA_FILE,
+        ScmConfigKeys.OZONE_SCM_NETWORK_TOPOLOGY_SCHEMA_FILE_DEFAULT);
+
+    NodeSchemaLoadResult result;
+    try {
+      result = NodeSchemaLoader.getInstance().loadSchemaFromFile(schemaFile);
+      allSchema = result.getSchemaList();
+      enforcePrefix = result.isEnforePrefix();
+      maxLevel = allSchema.size();
+    } catch (Throwable e) {
+      String msg = "Fail to load schema file:" + schemaFile
+          + ", error:" + e.getMessage();
+      LOG.error(msg);
+      throw new RuntimeException(msg);
+    }
+  }
+
+  @VisibleForTesting
+  public void init(NodeSchema[] schemas, boolean enforce) {
+    allSchema = new ArrayList<>();
+    allSchema.addAll(Arrays.asList(schemas));
+    enforcePrefix = enforce;
+    maxLevel = schemas.length;
+  }
+
+  public int getMaxLevel() {
+    return maxLevel;
+  }
+
+  public int getCost(int level) {
+    Preconditions.checkArgument(level <= maxLevel &&
+        level >= (NetConstants.ROOT_LEVEL));
+    return allSchema.get(level - NetConstants.ROOT_LEVEL).getCost();
+  }
+
+  /**
+   * Given a incomplete network path, return its complete network path if
+   * possible. E.g. input is 'node1', output is '/rack-default/node1' if this
+   * schema manages ROOT, RACK and LEAF, with prefix defined and enforce prefix
+   * enabled.
+   *
+   * @param path the incomplete input path
+   * @return complete path, null if cannot carry out complete action or action
+   * failed
+   */
+  public String complete(String path) {
+    if (!enforcePrefix) {
+      return null;
+    }
+    String normalizedPath = NetUtils.normalize(path);
+    String[] subPath = normalizedPath.split(NetConstants.PATH_SEPARATOR_STR);
+    if ((subPath.length) == maxLevel) {
+      return path;
+    }
+    StringBuffer newPath = new StringBuffer(NetConstants.ROOT);
+    // skip the ROOT and LEAF layer
+    int i, j;
+    for (i = 1, j = 1; i < subPath.length && j < (allSchema.size() - 1);) {
+      if (allSchema.get(j).matchPrefix(subPath[i])) {
+        newPath.append(NetConstants.PATH_SEPARATOR_STR + subPath[i]);
+        i++;
+        j++;
+      } else {
+        newPath.append(allSchema.get(j).getDefaultName());
+        j++;
+      }
+    }
+    if (i == (subPath.length - 1)) {
+      newPath.append(NetConstants.PATH_SEPARATOR_STR + subPath[i]);
+      return newPath.toString();
+    }
+    return null;
+  }
+}
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/package-info.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/package-info.java
new file mode 100644
index 0000000..375af7f
--- /dev/null
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/net/package-info.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.net;
+/**
+ The network topology supported by Ozone.
+ */
\ No newline at end of file
diff --git a/hadoop-hdds/common/src/main/resources/network-topology-default.xml b/hadoop-hdds/common/src/main/resources/network-topology-default.xml
new file mode 100644
index 0000000..f86597c
--- /dev/null
+++ b/hadoop-hdds/common/src/main/resources/network-topology-default.xml
@@ -0,0 +1,68 @@
+<?xml version="1.0"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<!--
+This is the default for network topology configuration. It defines
+level prefix key name, level default cost and max levels.
+-->
+<configuration>
+    <!-- The version of network topology configuration file format, it must be an integer -->
+    <layoutversion>1</layoutversion>
+    <layers>
+        <layer id="datacenter">
+            <prefix></prefix>
+            <cost>1</cost>
+            <type>Root</type>
+        </layer>
+        <!-- layer id is only used as the reference internally in this document -->
+        <layer id="rack">
+            <!-- prefix of the name of this layer. For example, if the prefix is "dc", then every
+            name in this layer should start with "dc", such as "dc1", "dc2", otherwise NetworkTopology
+            class should report error when add a node path which does't follow this rule. This field
+            is case insensitive. It is optional and can have empty or "" value, in all these cases
+            prefix check will not be enforced.
+            -->
+            <prefix>rack</prefix>
+            <!-- The default cost of this layer, an positive integer or 0. Can be override by the
+            "${cost}" value in specific path. This field is also optional. When it's not defined,
+            it's value is default "1".
+            -->
+            <cost>1</cost>
+            <!-- Layer type, optional field, case insensitive, default value InnerNode.
+            Current value range : {Root, InnerNode, Leaf}
+            Leaf node can only appear in the end of the "path" field of the "topology" section.
+            Root node is a special node. It doesn't have name. It's represented by "/" at the beginning of the path.
+            -->
+            <type>InnerNode</type>
+            <!-- default name if this layer is missed. Only apply to InnerNode. Ignored for Leaf node and Root. -->
+            <default>/default-rack</default>
+        </layer>
+        <layer id="node">
+            <prefix></prefix>
+            <cost>0</cost>
+            <type>Leaf</type>
+        </layer>
+    </layers>
+    <topology>
+        <path>/datacenter/rack/node</path>
+        <!-- When this field is true, each InnerNode layer should has its prefix defined with not empty value,
+         otherwise the content is not valid. Default value is false.
+         -->
+        <enforceprefix>false</enforceprefix>
+    </topology>
+</configuration>
\ No newline at end of file
diff --git a/hadoop-hdds/common/src/main/resources/network-topology-nodegroup.xml b/hadoop-hdds/common/src/main/resources/network-topology-nodegroup.xml
new file mode 100644
index 0000000..b43ebd5
--- /dev/null
+++ b/hadoop-hdds/common/src/main/resources/network-topology-nodegroup.xml
@@ -0,0 +1,74 @@
+<?xml version="1.0"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<!--
+This is the default for network topology configuration. It defines level
+prefix key name, level default cost and max levels.
+-->
+<configuration>
+    <!-- The version of network topology configuration file format, it must be an integer -->
+    <layoutversion>1</layoutversion>
+    <layers>
+        <layer id="datacenter">
+            <prefix></prefix>
+            <cost>1</cost>
+            <type>Root</type>
+        </layer>
+        <!-- layer id is only used as the reference internally in this document -->
+        <layer id="rack">
+            <!-- prefix of the name of this layer. For example, if the prefix is "dc", then every
+            name in this layer should start with "dc", such as "dc1", "dc2", otherwise
+            NetworkTopology class should report error when add a node path which does't follow this
+            rule. This field is case insensitive. It is optional is and can have empty or "" value, in
+            all these cases prefix check will not be enforced.
+            -->
+            <prefix>rack</prefix>
+            <!-- The default cost of this layer, an positive integer or 0. Can be override by the
+            "${cost}" value in specific path. This field is also optional. When it's not defined,
+            it's value is default "1".
+            -->
+            <cost>1</cost>
+            <!-- Layer type, optional field, case insensitive, default value InnerNode.
+            Current value range : {Root, InnerNode, Leaf}
+            Leaf node can only appear in the end of the "path" field of the "topology" section.
+            Root node is a special node. It doesn't have name. It's represented by "/" at the beginning of the path.
+            -->
+            <type>InnerNode</type>
+            <!-- default name if this layer is missed. Only apply to InnerNode. Ignored for Leaf node and Root -->
+            <default>/default-rack</default>
+        </layer>
+        <layer id="nodegroup">
+            <prefix>ng</prefix>
+            <cost>1</cost>
+            <type>InnerNode</type>
+            <default>/default-nodegroup</default>
+        </layer>
+        <layer id="node">
+            <prefix></prefix>
+            <cost>0</cost>
+            <type>Leaf</type>
+        </layer>
+    </layers>
+    <topology>
+        <path>/datacenter/rack/nodegroup/node</path>
+        <!-- When this field is true, each InnerNode layer should has its prefix defined with not empty value,
+         otherwise the content is not valid. Default value is false.
+        -->
+        <enforceprefix>false</enforceprefix>
+    </topology>
+</configuration>
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index bb59b9b..dd43c62 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -2269,4 +2269,11 @@
       ozone.metadata.dirs.
     </description>
   </property>
+  <property>
+    <name>ozone.scm.network.topology.schema.file</name>
+    <value>network-topology-default.xm</value>
+    <tag>OZONE, MANAGEMENT</tag>
+    <description>The schema file defines the ozone network topology.
+    </description>
+  </property>
 </configuration>
diff --git a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/net/TestNetworkTopologyImpl.java b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/net/TestNetworkTopologyImpl.java
new file mode 100644
index 0000000..0edfb07
--- /dev/null
+++ b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/net/TestNetworkTopologyImpl.java
@@ -0,0 +1,922 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.net;
+
+import org.apache.hadoop.conf.Configuration;
+import static org.apache.hadoop.hdds.scm.net.NetConstants.ROOT;
+import static org.apache.hadoop.hdds.scm.net.NetConstants.PATH_SEPARATOR_STR;
+import static org.apache.hadoop.hdds.scm.net.NetConstants.ROOT_SCHEMA;
+import static org.apache.hadoop.hdds.scm.net.NetConstants.REGION_SCHEMA;
+import static org.apache.hadoop.hdds.scm.net.NetConstants.DATACENTER_SCHEMA;
+import static org.apache.hadoop.hdds.scm.net.NetConstants.RACK_SCHEMA;
+import static org.apache.hadoop.hdds.scm.net.NetConstants.NODEGROUP_SCHEMA;
+import static org.apache.hadoop.hdds.scm.net.NetConstants.LEAF_SCHEMA;
+
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.stream.Collectors;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.junit.Assume.assumeTrue;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+import org.junit.runner.RunWith;
+
+/** Test the network topology functions. */
+@RunWith(Parameterized.class)
+public class TestNetworkTopologyImpl {
+  private static final Logger LOG = LoggerFactory.getLogger(
+      TestNetworkTopologyImpl.class);
+  private NetworkTopology cluster;
+  private Node[] dataNodes;
+  private Random random = new Random();
+
+  public TestNetworkTopologyImpl(NodeSchema[] schemas, Node[] nodeArray) {
+    NodeSchemaManager.getInstance().init(schemas, true);
+    cluster = new NetworkTopologyImpl(NodeSchemaManager.getInstance());
+    dataNodes = nodeArray;
+    for (int i = 0; i < dataNodes.length; i++) {
+      cluster.add(dataNodes[i]);
+    }
+  }
+
+  @Rule
+  public Timeout testTimeout = new Timeout(3000000);
+
+  @Parameters
+  public static Collection<Object[]> setupDatanodes() {
+    Object[][] topologies = new Object[][]{
+        {new NodeSchema[] {ROOT_SCHEMA, LEAF_SCHEMA},
+            new Node[]{
+                createDatanode("1.1.1.1", "/"),
+                createDatanode("2.2.2.2", "/"),
+                createDatanode("3.3.3.3", "/"),
+                createDatanode("4.4.4.4", "/"),
+                createDatanode("5.5.5.5", "/"),
+                createDatanode("6.6.6.6", "/"),
+                createDatanode("7.7.7.7", "/"),
+                createDatanode("8.8.8.8", "/"),
+            }},
+        {new NodeSchema[] {ROOT_SCHEMA, RACK_SCHEMA, LEAF_SCHEMA},
+            new Node[]{
+                createDatanode("1.1.1.1", "/r1"),
+                createDatanode("2.2.2.2", "/r1"),
+                createDatanode("3.3.3.3", "/r2"),
+                createDatanode("4.4.4.4", "/r2"),
+                createDatanode("5.5.5.5", "/r2"),
+                createDatanode("6.6.6.6", "/r3"),
+                createDatanode("7.7.7.7", "/r3"),
+                createDatanode("8.8.8.8", "/r3"),
+            }},
+        {new NodeSchema[]
+            {ROOT_SCHEMA, DATACENTER_SCHEMA, RACK_SCHEMA, LEAF_SCHEMA},
+            new Node[]{
+                createDatanode("1.1.1.1", "/d1/r1"),
+                createDatanode("2.2.2.2", "/d1/r1"),
+                createDatanode("3.3.3.3", "/d1/r2"),
+                createDatanode("4.4.4.4", "/d1/r2"),
+                createDatanode("5.5.5.5", "/d1/r2"),
+                createDatanode("6.6.6.6", "/d2/r3"),
+                createDatanode("7.7.7.7", "/d2/r3"),
+                createDatanode("8.8.8.8", "/d2/r3"),
+            }},
+        {new NodeSchema[] {ROOT_SCHEMA, DATACENTER_SCHEMA, RACK_SCHEMA,
+            NODEGROUP_SCHEMA, LEAF_SCHEMA},
+            new Node[]{
+                createDatanode("1.1.1.1", "/d1/r1/ng1"),
+                createDatanode("2.2.2.2", "/d1/r1/ng1"),
+                createDatanode("3.3.3.3", "/d1/r2/ng2"),
+                createDatanode("4.4.4.4", "/d1/r2/ng2"),
+                createDatanode("5.5.5.5", "/d1/r2/ng3"),
+                createDatanode("6.6.6.6", "/d2/r3/ng3"),
+                createDatanode("7.7.7.7", "/d2/r3/ng3"),
+                createDatanode("8.8.8.8", "/d2/r3/ng3"),
+                createDatanode("9.9.9.9", "/d3/r1/ng1"),
+                createDatanode("10.10.10.10", "/d3/r1/ng1"),
+                createDatanode("11.11.11.11", "/d3/r1/ng1"),
+                createDatanode("12.12.12.12", "/d3/r2/ng2"),
+                createDatanode("13.13.13.13", "/d3/r2/ng2"),
+                createDatanode("14.14.14.14", "/d4/r1/ng1"),
+                createDatanode("15.15.15.15", "/d4/r1/ng1"),
+                createDatanode("16.16.16.16", "/d4/r1/ng1"),
+                createDatanode("17.17.17.17", "/d4/r1/ng2"),
+                createDatanode("18.18.18.18", "/d4/r1/ng2"),
+                createDatanode("19.19.19.19", "/d4/r1/ng3"),
+                createDatanode("20.20.20.20", "/d4/r1/ng3"),
+            }},
+        {new NodeSchema[] {ROOT_SCHEMA, REGION_SCHEMA, DATACENTER_SCHEMA,
+            RACK_SCHEMA, NODEGROUP_SCHEMA, LEAF_SCHEMA},
+            new Node[]{
+                createDatanode("1.1.1.1", "/d1/rg1/r1/ng1"),
+                createDatanode("2.2.2.2", "/d1/rg1/r1/ng1"),
+                createDatanode("3.3.3.3", "/d1/rg1/r1/ng2"),
+                createDatanode("4.4.4.4", "/d1/rg1/r1/ng1"),
+                createDatanode("5.5.5.5", "/d1/rg1/r1/ng1"),
+                createDatanode("6.6.6.6", "/d1/rg1/r1/ng2"),
+                createDatanode("7.7.7.7", "/d1/rg1/r1/ng2"),
+                createDatanode("8.8.8.8", "/d1/rg1/r1/ng2"),
+                createDatanode("9.9.9.9", "/d1/rg1/r1/ng2"),
+                createDatanode("10.10.10.10", "/d1/rg1/r1/ng2"),
+                createDatanode("11.11.11.11", "/d1/rg1/r2/ng1"),
+                createDatanode("12.12.12.12", "/d1/rg1/r2/ng1"),
+                createDatanode("13.13.13.13", "/d1/rg1/r2/ng1"),
+                createDatanode("14.14.14.14", "/d1/rg1/r2/ng1"),
+                createDatanode("15.15.15.15", "/d1/rg1/r2/ng1"),
+                createDatanode("16.16.16.16", "/d1/rg1/r2/ng2"),
+                createDatanode("17.17.17.17", "/d1/rg1/r2/ng2"),
+                createDatanode("18.18.18.18", "/d1/rg1/r2/ng2"),
+                createDatanode("19.19.19.19", "/d1/rg1/r2/ng2"),
+                createDatanode("20.20.20.20", "/d1/rg1/r2/ng2"),
+                createDatanode("21.21.21.21", "/d2/rg1/r2/ng1"),
+                createDatanode("22.22.22.22", "/d2/rg1/r2/ng1"),
+                createDatanode("23.23.23.23", "/d2/rg2/r2/ng1"),
+                createDatanode("24.24.24.24", "/d2/rg2/r2/ng1"),
+                createDatanode("25.25.25.25", "/d2/rg2/r2/ng1"),
+            }}
+    };
+    return Arrays.asList(topologies);
+  }
+
+  @Test
+  public void testContains() {
+    Node nodeNotInMap = createDatanode("8.8.8.8", "/d2/r4");
+    for (int i=0; i < dataNodes.length; i++) {
+      assertTrue(cluster.contains(dataNodes[i]));
+    }
+    assertFalse(cluster.contains(nodeNotInMap));
+  }
+
+  @Test
+  public void testNumOfChildren() {
+    assertEquals(dataNodes.length, cluster.getNumOfLeafNode(null));
+    assertEquals(0, cluster.getNumOfLeafNode("/switch1/node1"));
+  }
+
+  @Test
+  public void testGetNode() {
+    assertEquals(cluster.getNode(""), cluster.getNode(null));
+    assertEquals(cluster.getNode(""), cluster.getNode("/"));
+    assertEquals(null, cluster.getNode("/switch1/node1"));
+    assertEquals(null, cluster.getNode("/switch1"));
+  }
+
+  @Test
+  public void testCreateInvalidTopology() {
+    List<NodeSchema> schemas = new ArrayList<NodeSchema>();
+    schemas.add(ROOT_SCHEMA);
+    schemas.add(RACK_SCHEMA);
+    schemas.add(LEAF_SCHEMA);
+    NodeSchemaManager.getInstance().init(schemas.toArray(new NodeSchema[0]),
+        true);
+    NetworkTopology newCluster = new NetworkTopologyImpl(
+        NodeSchemaManager.getInstance());
+    Node[] invalidDataNodes = new Node[] {
+        createDatanode("1.1.1.1", "/r1"),
+        createDatanode("2.2.2.2", "/r2"),
+        createDatanode("3.3.3.3", "/d1/r2")
+    };
+    newCluster.add(invalidDataNodes[0]);
+    newCluster.add(invalidDataNodes[1]);
+    try {
+      newCluster.add(invalidDataNodes[2]);
+      fail("expected InvalidTopologyException");
+    } catch (NetworkTopology.InvalidTopologyException e) {
+      assertTrue(e.getMessage().contains("Failed to add"));
+      assertTrue(e.getMessage().contains("Its path depth is not " +
+          newCluster.getMaxLevel()));
+    }
+  }
+
+  @Test
+  public void testInitWithConfigFile() {
+    ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
+    Configuration conf = new Configuration();
+    try {
+      String filePath = classLoader.getResource(
+          "./networkTopologyTestFiles/good.xml").getPath();
+      conf.set(ScmConfigKeys.OZONE_SCM_NETWORK_TOPOLOGY_SCHEMA_FILE, filePath);
+      NetworkTopology newCluster = new NetworkTopologyImpl(conf);
+      LOG.info("network topology max level = " + newCluster.getMaxLevel());
+    } catch (Throwable e) {
+      fail("should succeed");
+    }
+  }
+
+  @Test
+  public void testAncestor() {
+    assumeTrue(cluster.getMaxLevel() > 2);
+    int maxLevel = cluster.getMaxLevel();
+    assertTrue(cluster.isSameParent(dataNodes[0], dataNodes[1]));
+    while(maxLevel > 1) {
+      assertTrue(cluster.isSameAncestor(dataNodes[0], dataNodes[1],
+          maxLevel - 1));
+      maxLevel--;
+    }
+    assertFalse(cluster.isSameParent(dataNodes[1], dataNodes[2]));
+    assertFalse(cluster.isSameParent(null, dataNodes[2]));
+    assertFalse(cluster.isSameParent(dataNodes[1], null));
+    assertFalse(cluster.isSameParent(null, null));
+
+    assertFalse(cluster.isSameAncestor(dataNodes[1], dataNodes[2], 0));
+    assertFalse(cluster.isSameAncestor(dataNodes[1], null, 1));
+    assertFalse(cluster.isSameAncestor(null, dataNodes[2], 1));
+    assertFalse(cluster.isSameAncestor(null, null, 1));
+
+    maxLevel = cluster.getMaxLevel();
+    assertTrue(cluster.isSameAncestor(
+        dataNodes[random.nextInt(cluster.getNumOfLeafNode(null))],
+        dataNodes[random.nextInt(cluster.getNumOfLeafNode(null))],
+        maxLevel - 1));
+  }
+
+  @Test
+  public void testAddRemove() {
+    for(int i = 0; i < dataNodes.length; i++) {
+      cluster.remove(dataNodes[i]);
+    }
+    for(int i = 0; i < dataNodes.length; i++) {
+      assertFalse(cluster.contains(dataNodes[i]));
+    }
+    // no leaf nodes
+    assertEquals(0, cluster.getNumOfLeafNode(null));
+    // no inner nodes
+    assertEquals(0, cluster.getNumOfNodes(2));
+    for(int i = 0; i < dataNodes.length; i++) {
+      cluster.add(dataNodes[i]);
+    }
+    // Inner nodes are created automatically
+    assertTrue(cluster.getNumOfNodes(2) > 0);
+
+    try {
+      cluster.add(cluster.chooseRandom(null).getParent());
+      fail("Inner node can not be added manually");
+    } catch (Exception e) {
+      assertTrue(e.getMessage().startsWith(
+          "Not allowed to add an inner node"));
+    }
+
+    try {
+      cluster.remove(cluster.chooseRandom(null).getParent());
+      fail("Inner node can not be removed manually");
+    } catch (Exception e) {
+      assertTrue(e.getMessage().startsWith(
+          "Not allowed to remove an inner node"));
+    }
+  }
+
+  @Test
+  public void testGetNodesWithLevel() {
+    int maxLevel = cluster.getMaxLevel();
+    try {
+      assertEquals(1, cluster.getNumOfNodes(0));
+      fail("level 0 is not supported");
+    } catch (IllegalArgumentException e) {
+      assertTrue(e.getMessage().startsWith("Invalid level"));
+    }
+
+    try {
+      assertEquals(1, cluster.getNumOfNodes(0));
+      fail("level 0 is not supported");
+    } catch (IllegalArgumentException e) {
+      assertTrue(e.getMessage().startsWith("Invalid level"));
+    }
+
+    try {
+      assertEquals(1, cluster.getNumOfNodes(maxLevel + 1));
+      fail("level out of scope");
+    } catch (IllegalArgumentException e) {
+      assertTrue(e.getMessage().startsWith("Invalid level"));
+    }
+
+    try {
+      assertEquals(1, cluster.getNumOfNodes(maxLevel + 1));
+      fail("level out of scope");
+    } catch (IllegalArgumentException e) {
+      assertTrue(e.getMessage().startsWith("Invalid level"));
+    }
+    // root node
+    assertEquals(1, cluster.getNumOfNodes(1));
+    assertEquals(1, cluster.getNumOfNodes(1));
+    // leaf nodes
+    assertEquals(dataNodes.length, cluster.getNumOfNodes(maxLevel));
+    assertEquals(dataNodes.length, cluster.getNumOfNodes(maxLevel));
+  }
+
+  @Test
+  public void testChooseRandomSimple() {
+    String path =
+        dataNodes[random.nextInt(dataNodes.length)].getNetworkFullPath();
+    assertEquals(path, cluster.chooseRandom(path).getNetworkFullPath());
+    path = path.substring(0, path.lastIndexOf(PATH_SEPARATOR_STR));
+    // test chooseRandom(String scope)
+    while (!path.equals(ROOT)) {
+      assertTrue(cluster.chooseRandom(path).getNetworkLocation()
+          .startsWith(path));
+      Node node = cluster.chooseRandom("~" + path);
+      assertTrue(!node.getNetworkLocation()
+          .startsWith(path));
+      path = path.substring(0,
+          path.lastIndexOf(PATH_SEPARATOR_STR));
+    }
+    assertNotNull(cluster.chooseRandom(null));
+    assertNotNull(cluster.chooseRandom(""));
+    assertNotNull(cluster.chooseRandom("/"));
+    assertNull(cluster.chooseRandom("~"));
+    assertNull(cluster.chooseRandom("~/"));
+
+    // test chooseRandom(String scope, String excludedScope)
+    path = dataNodes[random.nextInt(dataNodes.length)].getNetworkFullPath();
+    assertNull(cluster.chooseRandom(path, path));
+    assertNotNull(cluster.chooseRandom(null, path));
+    assertNotNull(cluster.chooseRandom("", path));
+
+    // test chooseRandom(String scope, Collection<Node> excludedNodes)
+    assertNull(cluster.chooseRandom("", Arrays.asList(dataNodes)));
+    assertNull(cluster.chooseRandom("/", Arrays.asList(dataNodes)));
+    assertNull(cluster.chooseRandom("~", Arrays.asList(dataNodes)));
+    assertNull(cluster.chooseRandom("~/", Arrays.asList(dataNodes)));
+    assertNull(cluster.chooseRandom(null, Arrays.asList(dataNodes)));
+  }
+
+  /**
+   * Following test checks that chooseRandom works for an excluded scope.
+   */
+  @Test
+  public void testChooseRandomExcludedScope() {
+    int[] excludedNodeIndexs = {0, dataNodes.length - 1,
+        random.nextInt(dataNodes.length), random.nextInt(dataNodes.length)};
+    String scope;
+    Map<Node, Integer> frequency;
+    for (int i : excludedNodeIndexs) {
+      String path = dataNodes[i].getNetworkFullPath();
+      while (!path.equals(ROOT)) {
+        scope = "~" + path;
+        frequency = pickNodesAtRandom(100, scope, null, 0);
+        for (Node key : dataNodes) {
+          if (key.getNetworkFullPath().startsWith(path)) {
+            assertTrue(frequency.get(key) == 0);
+          }
+        }
+        path = path.substring(0, path.lastIndexOf(PATH_SEPARATOR_STR));
+      }
+    }
+
+    // null excludedScope, every node should be chosen
+    frequency = pickNodes(100, null, null, null, 0);
+    for (Node key : dataNodes) {
+      assertTrue(frequency.get(key) != 0);
+    }
+
+    // "" excludedScope,  no node will ever be chosen
+    frequency = pickNodes(100, "", null, null, 0);
+    for (Node key : dataNodes) {
+      assertTrue(frequency.get(key) == 0);
+    }
+
+    // "~" scope, no node will ever be chosen
+    scope = "~";
+    frequency = pickNodesAtRandom(100, scope, null, 0);
+    for (Node key : dataNodes) {
+      assertTrue(frequency.get(key) == 0);
+    }
+    // out network topology excluded scope, every node should be chosen
+    scope = "/city1";
+    frequency = pickNodes(cluster.getNumOfLeafNode(null), scope, null, null, 0);
+    for (Node key : dataNodes) {
+      assertTrue(frequency.get(key) != 0);
+    }
+  }
+
+  /**
+   * Following test checks that chooseRandom works for an excluded nodes.
+   */
+  @Test
+  public void testChooseRandomExcludedNode() {
+    Node[][] excludedNodeLists = {
+        {},
+        {dataNodes[0]},
+        {dataNodes[dataNodes.length - 1]},
+        {dataNodes[random.nextInt(dataNodes.length)]},
+        {dataNodes[random.nextInt(dataNodes.length)],
+            dataNodes[random.nextInt(dataNodes.length)]
+        },
+        {dataNodes[random.nextInt(dataNodes.length)],
+            dataNodes[random.nextInt(dataNodes.length)],
+            dataNodes[random.nextInt(dataNodes.length)],
+        }};
+    int leafNum = cluster.getNumOfLeafNode(null);
+    Map<Node, Integer> frequency;
+    for(Node[] list : excludedNodeLists) {
+      List<Node> excludedList = Arrays.asList(list);
+      int ancestorGen = 0;
+      while(ancestorGen < cluster.getMaxLevel()) {
+        frequency = pickNodesAtRandom(leafNum, null, excludedList, ancestorGen);
+        List<Node> ancestorList = NetUtils.getAncestorList(cluster,
+            excludedList, ancestorGen);
+        for (Node key : dataNodes) {
+          if (excludedList.contains(key) ||
+              (ancestorList.size() > 0 &&
+                  ancestorList.stream()
+                      .map(a -> (InnerNode) a)
+                      .filter(a -> a.isAncestor(key))
+                      .collect(Collectors.toList()).size() > 0)) {
+            assertTrue(frequency.get(key) == 0);
+          }
+        }
+        ancestorGen++;
+      }
+    }
+    // all nodes excluded, no node will be picked
+    List<Node> excludedList = Arrays.asList(dataNodes);
+    int ancestorGen = 0;
+    while(ancestorGen < cluster.getMaxLevel()) {
+      frequency = pickNodesAtRandom(leafNum, null, excludedList, ancestorGen);
+      for (Node key : dataNodes) {
+        assertTrue(frequency.get(key) == 0);
+      }
+      ancestorGen++;
+    }
+    // out scope excluded nodes, each node will be picked
+    excludedList = Arrays.asList(createDatanode("1.1.1.1.", "/city1/rack1"));
+    ancestorGen = 0;
+    while(ancestorGen < cluster.getMaxLevel()) {
+      frequency = pickNodes(leafNum, null, excludedList, null, ancestorGen);
+      for (Node key : dataNodes) {
+        assertTrue(frequency.get(key) != 0);
+      }
+      ancestorGen++;
+    }
+  }
+
+  /**
+   * Following test checks that chooseRandom works for excluded nodes and scope.
+   */
+  @Test
+  public void testChooseRandomExcludedNodeAndScope() {
+    int[] excludedNodeIndexs = {0, dataNodes.length - 1,
+        random.nextInt(dataNodes.length), random.nextInt(dataNodes.length)};
+    Node[][] excludedNodeLists = {
+        {},
+        {dataNodes[0]},
+        {dataNodes[dataNodes.length - 1]},
+        {dataNodes[random.nextInt(dataNodes.length)]},
+        {dataNodes[random.nextInt(dataNodes.length)],
+            dataNodes[random.nextInt(dataNodes.length)]
+        },
+        {dataNodes[random.nextInt(dataNodes.length)],
+            dataNodes[random.nextInt(dataNodes.length)],
+            dataNodes[random.nextInt(dataNodes.length)],
+        }};
+    int leafNum = cluster.getNumOfLeafNode(null);
+    Map<Node, Integer> frequency;
+    String scope;
+    for (int i : excludedNodeIndexs) {
+      String path = dataNodes[i].getNetworkFullPath();
+      while (!path.equals(ROOT)) {
+        scope = "~" + path;
+        int ancestorGen = 0;
+        while(ancestorGen < cluster.getMaxLevel()) {
+          for (Node[] list : excludedNodeLists) {
+            List<Node> excludedList = Arrays.asList(list);
+            frequency =
+                pickNodesAtRandom(leafNum, scope, excludedList, ancestorGen);
+            List<Node> ancestorList = NetUtils.getAncestorList(cluster,
+                excludedList, ancestorGen);
+            for (Node key : dataNodes) {
+              if (excludedList.contains(key) ||
+                  key.getNetworkFullPath().startsWith(path) ||
+                  (ancestorList.size() > 0 &&
+                      ancestorList.stream()
+                          .map(a -> (InnerNode) a)
+                          .filter(a -> a.isAncestor(key))
+                          .collect(Collectors.toList()).size() > 0)) {
+                assertTrue(frequency.get(key) == 0);
+              }
+            }
+          }
+          ancestorGen++;
+        }
+        path = path.substring(0, path.lastIndexOf(PATH_SEPARATOR_STR));
+      }
+    }
+    // all nodes excluded, no node will be picked
+    List<Node> excludedList = Arrays.asList(dataNodes);
+    for (int i : excludedNodeIndexs) {
+      String path = dataNodes[i].getNetworkFullPath();
+      while (!path.equals(ROOT)) {
+        scope = "~" + path;
+        int ancestorGen = 0;
+        while (ancestorGen < cluster.getMaxLevel()) {
+          frequency =
+              pickNodesAtRandom(leafNum, scope, excludedList, ancestorGen);
+          for (Node key : dataNodes) {
+            assertTrue(frequency.get(key) == 0);
+          }
+          ancestorGen++;
+        }
+        path = path.substring(0, path.lastIndexOf(PATH_SEPARATOR_STR));
+      }
+    }
+
+    // no node excluded and no excluded scope, each node will be picked
+    int ancestorGen = 0;
+    while (ancestorGen < cluster.getMaxLevel()) {
+      frequency = pickNodes(leafNum, null, null, null, ancestorGen);
+      for (Node key : dataNodes) {
+        assertTrue(frequency.get(key) != 0);
+      }
+      ancestorGen++;
+    }
+  }
+
+  /**
+   * Following test checks that chooseRandom works for excluded nodes, scope
+   * and ancestor generation.
+   */
+  @Test
+  public void testChooseRandomWithAffinityNode() {
+    int[] excludedNodeIndexs = {0, dataNodes.length - 1,
+        random.nextInt(dataNodes.length), random.nextInt(dataNodes.length)};
+    Node[][] excludedNodeLists = {
+        {},
+        {dataNodes[0]},
+        {dataNodes[dataNodes.length - 1]},
+        {dataNodes[random.nextInt(dataNodes.length)]},
+        {dataNodes[random.nextInt(dataNodes.length)],
+            dataNodes[random.nextInt(dataNodes.length)]
+        },
+        {dataNodes[random.nextInt(dataNodes.length)],
+            dataNodes[random.nextInt(dataNodes.length)],
+            dataNodes[random.nextInt(dataNodes.length)],
+        }};
+    int[] affinityNodeIndexs = {0, dataNodes.length - 1,
+        random.nextInt(dataNodes.length), random.nextInt(dataNodes.length)};
+    int leafNum = cluster.getNumOfLeafNode(null);
+    Map<Node, Integer> frequency;
+    String scope;
+    for (int k : affinityNodeIndexs) {
+      for (int i : excludedNodeIndexs) {
+        String path = dataNodes[i].getNetworkFullPath();
+        while (!path.equals(ROOT)) {
+          int ancestorGen = cluster.getMaxLevel() - 1;
+          while (ancestorGen > 0) {
+            for (Node[] list : excludedNodeLists) {
+              List<Node> excludedList = Arrays.asList(list);
+              frequency = pickNodes(leafNum, path, excludedList, dataNodes[k],
+                  ancestorGen);
+              Node affinityAncestor = dataNodes[k].getAncestor(ancestorGen);
+              for (Node key : dataNodes) {
+                if (affinityAncestor != null) {
+                  if (frequency.get(key) > 0) {
+                    assertTrue(affinityAncestor.isAncestor(key));
+                  } else if (!affinityAncestor.isAncestor(key)) {
+                    continue;
+                  } else if (excludedList != null &&
+                      excludedList.contains(key)) {
+                    continue;
+                  } else if (path != null &&
+                      key.getNetworkFullPath().startsWith(path)) {
+                    continue;
+                  } else {
+                    fail("Node is not picked when sequentially going " +
+                        "through ancestor node's leaf nodes. node:" +
+                        key.getNetworkFullPath() + ", ancestor node:" +
+                        affinityAncestor.getNetworkFullPath() +
+                        ", excludedScope: " + path + ", " + "excludedList:" +
+                        (excludedList == null ? "" : excludedList.toString()));
+                  }
+                }
+              }
+            }
+            ancestorGen--;
+          }
+          path = path.substring(0, path.lastIndexOf(PATH_SEPARATOR_STR));
+        }
+      }
+    }
+
+    // all nodes excluded, no node will be picked
+    List<Node> excludedList = Arrays.asList(dataNodes);
+    for (int k : affinityNodeIndexs) {
+      for (int i : excludedNodeIndexs) {
+        String path = dataNodes[i].getNetworkFullPath();
+        while (!path.equals(ROOT)) {
+          scope = "~" + path;
+          int ancestorGen = 0;
+          while (ancestorGen < cluster.getMaxLevel()) {
+            frequency = pickNodesAtRandom(leafNum, scope, excludedList,
+                dataNodes[k], ancestorGen);
+            for (Node key : dataNodes) {
+              assertTrue(frequency.get(key) == 0);
+            }
+            ancestorGen++;
+          }
+          path = path.substring(0, path.lastIndexOf(PATH_SEPARATOR_STR));
+        }
+      }
+    }
+    // no node excluded and no excluded scope, each node will be picked
+    int ancestorGen = cluster.getMaxLevel() - 1;
+    for (int k : affinityNodeIndexs) {
+      while (ancestorGen > 0) {
+        frequency =
+            pickNodes(leafNum, null, null, dataNodes[k], ancestorGen);
+        Node affinityAncestor = dataNodes[k].getAncestor(ancestorGen);
+        for (Node key : dataNodes) {
+          if (frequency.get(key) > 0) {
+            if (affinityAncestor != null) {
+              assertTrue(affinityAncestor.isAncestor(key));
+            }
+          }
+        }
+        ancestorGen--;
+      }
+    }
+    // check invalid ancestor generation
+    try {
+      cluster.chooseRandom(null, null, null, dataNodes[0],
+          cluster.getMaxLevel());
+      fail("ancestor generation exceeds max level, should fail");
+    } catch (Exception e) {
+      assertTrue(e.getMessage().startsWith("ancestorGen " +
+          cluster.getMaxLevel() +
+          " exceeds this network topology acceptable level"));
+    }
+  }
+
+  @Test
+  public void testCost() {
+    // network topology with default cost
+    List<NodeSchema> schemas = new ArrayList<>();
+    schemas.add(ROOT_SCHEMA);
+    schemas.add(RACK_SCHEMA);
+    schemas.add(NODEGROUP_SCHEMA);
+    schemas.add(LEAF_SCHEMA);
+
+    NodeSchemaManager manager = NodeSchemaManager.getInstance();
+    manager.init(schemas.toArray(new NodeSchema[0]), true);
+    NetworkTopology newCluster =
+        new NetworkTopologyImpl(manager);
+    Node[] nodeList = new Node[] {
+        createDatanode("1.1.1.1", "/r1/ng1"),
+        createDatanode("2.2.2.2", "/r1/ng1"),
+        createDatanode("3.3.3.3", "/r1/ng2"),
+        createDatanode("4.4.4.4", "/r2/ng1"),
+    };
+    for (Node node: nodeList) {
+      newCluster.add(node);
+    }
+    Node outScopeNode1 = createDatanode("5.5.5.5", "/r2/ng2");
+    Node outScopeNode2 = createDatanode("6.6.6.6", "/r2/ng2");
+    assertEquals(Integer.MAX_VALUE,
+        newCluster.getDistanceCost(nodeList[0], null));
+    assertEquals(Integer.MAX_VALUE,
+        newCluster.getDistanceCost(null, nodeList[0]));
+    assertEquals(Integer.MAX_VALUE,
+        newCluster.getDistanceCost(outScopeNode1, nodeList[0]));
+    assertEquals(Integer.MAX_VALUE,
+        newCluster.getDistanceCost(nodeList[0], outScopeNode1));
+    assertEquals(Integer.MAX_VALUE,
+        newCluster.getDistanceCost(outScopeNode1, outScopeNode2));
+
+    assertEquals(0, newCluster.getDistanceCost(null, null));
+    assertEquals(0, newCluster.getDistanceCost(nodeList[0], nodeList[0]));
+    assertEquals(2, newCluster.getDistanceCost(nodeList[0], nodeList[1]));
+    assertEquals(4, newCluster.getDistanceCost(nodeList[0], nodeList[2]));
+    assertEquals(6, newCluster.getDistanceCost(nodeList[0], nodeList[3]));
+
+    // network topology with customized cost
+    schemas.clear();
+    schemas.add(new NodeSchema.Builder()
+        .setType(NodeSchema.LayerType.ROOT).setCost(5).build());
+    schemas.add(new NodeSchema.Builder()
+        .setType(NodeSchema.LayerType.INNER_NODE).setCost(3).build());
+    schemas.add(new NodeSchema.Builder()
+        .setType(NodeSchema.LayerType.INNER_NODE).setCost(1).build());
+    schemas.add(new NodeSchema.Builder()
+        .setType(NodeSchema.LayerType.LEAF_NODE).build());
+    manager = NodeSchemaManager.getInstance();
+    manager.init(schemas.toArray(new NodeSchema[0]), true);
+    newCluster = new NetworkTopologyImpl(manager);
+    for (Node node: nodeList) {
+      newCluster.add(node);
+    }
+    assertEquals(Integer.MAX_VALUE,
+        newCluster.getDistanceCost(nodeList[0], null));
+    assertEquals(Integer.MAX_VALUE,
+        newCluster.getDistanceCost(null, nodeList[0]));
+    assertEquals(Integer.MAX_VALUE,
+        newCluster.getDistanceCost(outScopeNode1, nodeList[0]));
+    assertEquals(Integer.MAX_VALUE,
+        newCluster.getDistanceCost(nodeList[0], outScopeNode1));
+    assertEquals(Integer.MAX_VALUE,
+        newCluster.getDistanceCost(outScopeNode1, outScopeNode2));
+
+    assertEquals(0, newCluster.getDistanceCost(null, null));
+    assertEquals(0, newCluster.getDistanceCost(nodeList[0], nodeList[0]));
+    assertEquals(2, newCluster.getDistanceCost(nodeList[0], nodeList[1]));
+    assertEquals(8, newCluster.getDistanceCost(nodeList[0], nodeList[2]));
+    assertEquals(18, newCluster.getDistanceCost(nodeList[0], nodeList[3]));
+  }
+
+  @Test
+  public void testSortByDistanceCost() {
+    Node[][] nodes = {
+        {},
+        {dataNodes[0]},
+        {dataNodes[dataNodes.length - 1]},
+        {dataNodes[random.nextInt(dataNodes.length)]},
+        {dataNodes[random.nextInt(dataNodes.length)],
+            dataNodes[random.nextInt(dataNodes.length)]
+        },
+        {dataNodes[random.nextInt(dataNodes.length)],
+            dataNodes[random.nextInt(dataNodes.length)],
+            dataNodes[random.nextInt(dataNodes.length)],
+        },
+        {dataNodes[random.nextInt(dataNodes.length)],
+            dataNodes[random.nextInt(dataNodes.length)],
+            dataNodes[random.nextInt(dataNodes.length)],
+            dataNodes[random.nextInt(dataNodes.length)],
+        },
+        {dataNodes[random.nextInt(dataNodes.length)],
+            dataNodes[random.nextInt(dataNodes.length)],
+            dataNodes[random.nextInt(dataNodes.length)],
+            dataNodes[random.nextInt(dataNodes.length)],
+            dataNodes[random.nextInt(dataNodes.length)],
+        }};
+    Node[] readers = {null, dataNodes[0], dataNodes[dataNodes.length - 1],
+        dataNodes[random.nextInt(dataNodes.length)],
+        dataNodes[random.nextInt(dataNodes.length)],
+        dataNodes[random.nextInt(dataNodes.length)]
+    };
+    for (Node reader : readers) {
+      for (Node[] nodeList : nodes) {
+        int length = nodeList.length;
+        while (length > 0) {
+          cluster.sortByDistanceCost(reader, nodeList, length);
+          for (int i = 0; i < nodeList.length; i++) {
+            if ((i + 1) < nodeList.length) {
+              int cost1 = cluster.getDistanceCost(reader, nodeList[i]);
+              int cost2 = cluster.getDistanceCost(reader, nodeList[i + 1]);
+              assertTrue("reader:" + (reader != null ?
+                  reader.getNetworkFullPath() : "null") +
+                  ",node1:" + nodeList[i].getNetworkFullPath() +
+                  ",node2:" + nodeList[i + 1].getNetworkFullPath() +
+                  ",cost1:" + cost1 + ",cost2:" + cost2,
+                  cost1 == Integer.MAX_VALUE || cost1 <= cost2);
+            }
+          }
+          length--;
+        }
+      }
+    }
+
+    // sort all nodes
+    Node[] nodeList = dataNodes.clone();
+    for (Node reader : readers) {
+      int length = nodeList.length;
+      while (length >= 0) {
+        cluster.sortByDistanceCost(reader, nodeList, length);
+        for (int i = 0; i < nodeList.length; i++) {
+          if ((i + 1) < nodeList.length) {
+            int cost1 = cluster.getDistanceCost(reader, nodeList[i]);
+            int cost2 = cluster.getDistanceCost(reader, nodeList[i + 1]);
+            // node can be removed when called in testConcurrentAccess
+            assertTrue("reader:" + (reader != null ?
+                reader.getNetworkFullPath() : "null") +
+                ",node1:" + nodeList[i].getNetworkFullPath() +
+                ",node2:" + nodeList[i + 1].getNetworkFullPath() +
+                ",cost1:" + cost1 + ",cost2:" + cost2,
+                cost1 == Integer.MAX_VALUE || cost1 <= cost2);
+          }
+        }
+        length--;
+      }
+    }
+  }
+
+  private static Node createDatanode(String name, String path) {
+    return new NodeImpl(name, path, NetConstants.NODE_COST_DEFAULT);
+  }
+
+  /**
+   * This picks a large number of nodes at random in order to ensure coverage.
+   *
+   * @param numNodes the number of nodes
+   * @param excludedScope the excluded scope
+   * @param excludedNodes the excluded node list
+   * @param ancestorGen the chosen node cannot share the same ancestor at
+   *                    this generation with excludedNodes
+   * @return the frequency that nodes were chosen
+   */
+  private Map<Node, Integer> pickNodesAtRandom(int numNodes,
+      String excludedScope, Collection<Node> excludedNodes, int ancestorGen) {
+    Map<Node, Integer> frequency = new HashMap<Node, Integer>();
+    for (Node dnd : dataNodes) {
+      frequency.put(dnd, 0);
+    }
+    for (int j = 0; j < numNodes; j++) {
+      Node node = cluster.chooseRandom(excludedScope, excludedNodes,
+          ancestorGen);
+      if (node != null) {
+        frequency.put(node, frequency.get(node) + 1);
+      }
+    }
+    LOG.info("Result:" + frequency);
+    return frequency;
+  }
+
+  /**
+   * This picks a large number of nodes at random in order to ensure coverage.
+   *
+   * @param numNodes the number of nodes
+   * @param excludedScope the excluded scope
+   * @param excludedNodes the excluded node list
+   * @param affinityNode the chosen node should share the same ancestor at
+   *                     generation "ancestorGen" with this node
+   * @param ancestorGen  the chosen node cannot share the same ancestor at
+   *                     this generation with excludedNodes
+   * @return the frequency that nodes were chosen
+   */
+  private Map<Node, Integer> pickNodesAtRandom(int numNodes,
+      String excludedScope, Collection<Node> excludedNodes, Node affinityNode,
+      int ancestorGen) {
+    Map<Node, Integer> frequency = new HashMap<Node, Integer>();
+    for (Node dnd : dataNodes) {
+      frequency.put(dnd, 0);
+    }
+
+    for (int j = 0; j < numNodes; j++) {
+      Node node = cluster.chooseRandom("", excludedScope.substring(1),
+          excludedNodes, affinityNode, ancestorGen);
+      if (node != null) {
+        frequency.put(node, frequency.get(node) + 1);
+      }
+    }
+    LOG.info("Result:" + frequency);
+    return frequency;
+  }
+
+  /**
+   * This picks a large amount of nodes sequentially.
+   *
+   * @param numNodes the number of nodes
+   * @param excludedScope the excluded scope, should not start with "~"
+   * @param excludedNodes the excluded node list
+   * @param affinityNode the chosen node should share the same ancestor at
+   *                     generation "ancestorGen" with this node
+   * @param ancestorGen  the chosen node cannot share the same ancestor at
+   *                     this generation with excludedNodes
+   * @return the frequency that nodes were chosen
+   */
+  private Map<Node, Integer> pickNodes(int numNodes, String excludedScope,
+      Collection<Node> excludedNodes, Node affinityNode, int ancestorGen) {
+    Map<Node, Integer> frequency = new HashMap<>();
+    for (Node dnd : dataNodes) {
+      frequency.put(dnd, 0);
+    }
+    excludedNodes = excludedNodes == null ? null :
+        excludedNodes.stream().distinct().collect(Collectors.toList());
+    for (int j = 0; j < numNodes; j++) {
+      Node node = cluster.getNode(j, null, excludedScope, excludedNodes,
+          affinityNode, ancestorGen);
+      if (node != null) {
+        frequency.put(node, frequency.get(node) + 1);
+      }
+    }
+
+    LOG.info("Result:" + frequency);
+    return frequency;
+  }
+}
diff --git a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/net/TestNodeSchemaLoader.java b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/net/TestNodeSchemaLoader.java
new file mode 100644
index 0000000..6d9057c
--- /dev/null
+++ b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/net/TestNodeSchemaLoader.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.net;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/** Test the node schema loader. */
+@RunWith(Parameterized.class)
+public class TestNodeSchemaLoader {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestNodeSchemaLoader.class);
+  private ClassLoader classLoader =
+      Thread.currentThread().getContextClassLoader();
+
+  public TestNodeSchemaLoader(String schemaFile, String errMsg) {
+    try {
+      String filePath = classLoader.getResource(
+          "./networkTopologyTestFiles/" + schemaFile).getPath();
+      NodeSchemaLoader.getInstance().loadSchemaFromFile(filePath);
+      fail("expect exceptions");
+    } catch (Throwable e) {
+      assertTrue(e.getMessage().contains(errMsg));
+    }
+  }
+
+  @Rule
+  public Timeout testTimeout = new Timeout(30000);
+
+  @Parameters
+  public static Collection<Object[]> getSchemaFiles() {
+    Object[][] schemaFiles = new Object[][]{
+        {"enforce-error.xml", "layer without prefix defined"},
+        {"invalid-cost.xml", "Cost should be positive number or 0"},
+        {"multiple-leaf.xml", "Multiple LEAF layers are found"},
+        {"multiple-root.xml", "Multiple ROOT layers are found"},
+        {"no-leaf.xml", "No LEAF layer is found"},
+        {"no-root.xml", "No ROOT layer is found"},
+        {"path-layers-size-mismatch.xml",
+            "Topology path depth doesn't match layer element numbers"},
+        {"path-with-id-reference-failure.xml",
+            "No layer found for id"},
+        {"unknown-layer-type.xml", "Unsupported layer type"},
+        {"wrong-path-order-1.xml",
+            "Topology path doesn't start with ROOT layer"},
+        {"wrong-path-order-2.xml", "Topology path doesn't end with LEAF layer"},
+        {"no-topology.xml", "no or multiple <topology> element"},
+        {"multiple-topology.xml", "no or multiple <topology> element"},
+        {"invalid-version.xml", "Bad layoutversion value"},
+    };
+    return Arrays.asList(schemaFiles);
+  }
+
+  @Test
+  public void testGood() {
+    try {
+      String filePath = classLoader.getResource(
+          "./networkTopologyTestFiles/good.xml").getPath();
+      NodeSchemaLoader.getInstance().loadSchemaFromFile(filePath);
+    } catch (Throwable e) {
+      fail("should succeed");
+    }
+  }
+
+  @Test
+  public void testNotExist() {
+    String filePath = classLoader.getResource(
+        "./networkTopologyTestFiles/good.xml").getPath() + ".backup";
+    try {
+      NodeSchemaLoader.getInstance().loadSchemaFromFile(filePath);
+      fail("should fail");
+    } catch (Throwable e) {
+      assertTrue(e.getMessage().contains("file " + filePath + " is not found"));
+    }
+  }
+}
diff --git a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/net/TestNodeSchemaManager.java b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/net/TestNodeSchemaManager.java
new file mode 100644
index 0000000..7e30419
--- /dev/null
+++ b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/net/TestNodeSchemaManager.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.net;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.hadoop.hdds.scm.net.NetConstants.DEFAULT_NODEGROUP;
+import static org.apache.hadoop.hdds.scm.net.NetConstants.DEFAULT_RACK;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/** Test the node schema loader. */
+public class TestNodeSchemaManager {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestNodeSchemaManager.class);
+  private ClassLoader classLoader =
+      Thread.currentThread().getContextClassLoader();
+  private NodeSchemaManager manager;
+  private Configuration conf;
+
+  public TestNodeSchemaManager() {
+    conf = new Configuration();
+    String filePath = classLoader.getResource(
+        "./networkTopologyTestFiles/good.xml").getPath();
+    conf.set(ScmConfigKeys.OZONE_SCM_NETWORK_TOPOLOGY_SCHEMA_FILE, filePath);
+    manager = NodeSchemaManager.getInstance();
+    manager.init(conf);
+  }
+
+  @Rule
+  public Timeout testTimeout = new Timeout(30000);
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testFailure1() {
+    manager.getCost(0);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testFailure2() {
+    manager.getCost(manager.getMaxLevel() + 1);
+  }
+
+  @Test
+  public void testPass() {
+    assertEquals(4, manager.getMaxLevel());
+    for (int i  = 1; i <= manager.getMaxLevel(); i++) {
+      assertTrue(manager.getCost(i) == 1 || manager.getCost(i) == 0);
+    }
+  }
+
+  @Test
+  public void testInitFailure() {
+    String filePath = classLoader.getResource(
+        "./networkTopologyTestFiles/good.xml").getPath() + ".backup";
+    conf.set(ScmConfigKeys.OZONE_SCM_NETWORK_TOPOLOGY_SCHEMA_FILE, filePath);
+    try {
+      manager.init(conf);
+      fail("should fail");
+    } catch (Throwable e) {
+      assertTrue(e.getMessage().contains("Fail to load schema file:" +
+          filePath));
+    }
+  }
+
+  @Test
+  public void testComplete() {
+    // successful complete action
+    String path = "/node1";
+    assertEquals(DEFAULT_RACK + DEFAULT_NODEGROUP + path,
+        manager.complete(path));
+    assertEquals("/rack" + DEFAULT_NODEGROUP + path,
+        manager.complete("/rack" + path));
+    assertEquals(DEFAULT_RACK + "/nodegroup" + path,
+        manager.complete("/nodegroup" + path));
+
+    // failed complete action
+    assertEquals(null, manager.complete("/dc" + path));
+  }
+}
diff --git a/hadoop-hdds/common/src/test/resources/networkTopologyTestFiles/enforce-error.xml b/hadoop-hdds/common/src/test/resources/networkTopologyTestFiles/enforce-error.xml
new file mode 100644
index 0000000..58c5802
--- /dev/null
+++ b/hadoop-hdds/common/src/test/resources/networkTopologyTestFiles/enforce-error.xml
@@ -0,0 +1,47 @@
+<?xml version="1.0"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<configuration>
+  <layoutversion>1</layoutversion>
+  <layers>
+    <layer id="datacenter">
+      <prefix></prefix>
+      <cost>1</cost>
+      <type>ROOT</type>
+    </layer>
+    <layer id="rack">
+      <prefix>rack</prefix>
+      <cost>1</cost>
+      <type>InnerNode</type>
+    </layer>
+    <layer id="nodegroup">
+      <prefix></prefix>
+      <cost>1</cost>
+      <type>InnerNode</type>
+    </layer>
+    <layer id="node">
+      <prefix></prefix>
+      <cost>0</cost>
+      <type>Leaf</type>
+    </layer>
+  </layers>
+  <topology>
+    <path>/datacenter/rack/nodegroup/node</path>
+    <enforceprefix>true</enforceprefix>
+  </topology>
+</configuration>
\ No newline at end of file
diff --git a/hadoop-hdds/common/src/test/resources/networkTopologyTestFiles/good.xml b/hadoop-hdds/common/src/test/resources/networkTopologyTestFiles/good.xml
new file mode 100644
index 0000000..25be9c2
--- /dev/null
+++ b/hadoop-hdds/common/src/test/resources/networkTopologyTestFiles/good.xml
@@ -0,0 +1,49 @@
+<?xml version="1.0"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<configuration>
+  <layoutversion>1</layoutversion>
+  <layers>
+    <layer id="datacenter">
+      <prefix></prefix>
+      <cost>1</cost>
+      <type>Root</type>
+    </layer>
+    <layer id="rack">
+      <prefix>rack</prefix>
+      <cost>1</cost>
+      <type>InnerNode</type>
+      <default>/default-rack</default>
+    </layer>
+    <layer id="nodegroup">
+      <prefix>nodegroup</prefix>
+      <cost>1</cost>
+      <type>InnerNode</type>
+      <default>/default-nodegroup</default>
+    </layer>
+    <layer id="node">
+      <prefix></prefix>
+      <cost>0</cost>
+      <type>Leaf</type>
+    </layer>
+  </layers>
+  <topology>
+    <path>/datacenter/rack/nodegroup/node</path>
+    <enforceprefix>true</enforceprefix>
+  </topology>
+</configuration>
\ No newline at end of file
diff --git a/hadoop-hdds/common/src/test/resources/networkTopologyTestFiles/invalid-cost.xml b/hadoop-hdds/common/src/test/resources/networkTopologyTestFiles/invalid-cost.xml
new file mode 100644
index 0000000..cf934bc
--- /dev/null
+++ b/hadoop-hdds/common/src/test/resources/networkTopologyTestFiles/invalid-cost.xml
@@ -0,0 +1,43 @@
+<?xml version="1.0"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<configuration>
+  <layoutversion>1</layoutversion>
+  <layers>
+    <layer id="datacenter">
+      <prefix></prefix>
+      <cost>1</cost>
+      <type>ROOT</type>
+    </layer>
+    <layer id="rack">
+      <prefix>rack</prefix>
+      <cost>-1</cost>
+      <type>InnerNode</type>
+      <default>default-rack</default>
+    </layer>
+    <layer id="node">
+      <prefix></prefix>
+      <cost>0</cost>
+      <type>leaf</type>
+    </layer>
+  </layers>
+  <topology>
+    <path>/datacenter/rack/node</path>
+    <enforceprefix>false</enforceprefix>
+  </topology>
+</configuration>
\ No newline at end of file
diff --git a/hadoop-hdds/common/src/test/resources/networkTopologyTestFiles/invalid-version.xml b/hadoop-hdds/common/src/test/resources/networkTopologyTestFiles/invalid-version.xml
new file mode 100644
index 0000000..d69aab1
--- /dev/null
+++ b/hadoop-hdds/common/src/test/resources/networkTopologyTestFiles/invalid-version.xml
@@ -0,0 +1,43 @@
+<?xml version="1.0"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<configuration>
+  <layoutversion>a</layoutversion>
+  <layers>
+    <layer id="datacenter">
+      <prefix></prefix>
+      <cost>1</cost>
+      <type>ROOT</type>
+    </layer>
+    <layer id="rack">
+      <prefix>rack</prefix>
+      <cost>-1</cost>
+      <type>InnerNode</type>
+      <default>default-rack</default>
+    </layer>
+    <layer id="node">
+      <prefix></prefix>
+      <cost>0</cost>
+      <type>leaf</type>
+    </layer>
+  </layers>
+  <topology>
+    <path>/datacenter/rack/node</path>
+    <enforceprefix>false</enforceprefix>
+  </topology>
+</configuration>
\ No newline at end of file
diff --git a/hadoop-hdds/common/src/test/resources/networkTopologyTestFiles/multiple-leaf.xml b/hadoop-hdds/common/src/test/resources/networkTopologyTestFiles/multiple-leaf.xml
new file mode 100644
index 0000000..a4297af
--- /dev/null
+++ b/hadoop-hdds/common/src/test/resources/networkTopologyTestFiles/multiple-leaf.xml
@@ -0,0 +1,43 @@
+<?xml version="1.0"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<configuration>
+  <layoutversion>1</layoutversion>
+  <layers>
+    <layer id="datacenter">
+      <prefix></prefix>
+      <cost>1</cost>
+      <type>ROOT</type>
+    </layer>
+    <layer id="rack">
+      <prefix>rack</prefix>
+      <cost>1</cost>
+      <type>Leaf</type>
+      <default>default-rack</default>
+    </layer>
+    <layer id="node">
+      <prefix></prefix>
+      <cost>0</cost>
+      <type>Leaf</type>
+    </layer>
+  </layers>
+  <topology>
+    <path>/datacenter/rack/node</path>
+    <enforceprefix>false</enforceprefix>
+  </topology>
+</configuration>
\ No newline at end of file
diff --git a/hadoop-hdds/common/src/test/resources/networkTopologyTestFiles/multiple-root.xml b/hadoop-hdds/common/src/test/resources/networkTopologyTestFiles/multiple-root.xml
new file mode 100644
index 0000000..afc7816
--- /dev/null
+++ b/hadoop-hdds/common/src/test/resources/networkTopologyTestFiles/multiple-root.xml
@@ -0,0 +1,43 @@
+<?xml version="1.0"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<configuration>
+  <layoutversion>1</layoutversion>
+  <layers>
+    <layer id="datacenter">
+      <prefix></prefix>
+      <cost>1</cost>
+      <type>ROOT</type>
+    </layer>
+    <layer id="rack">
+      <prefix>rack</prefix>
+      <cost>1</cost>
+      <type>ROOT</type>
+      <default>default-rack</default>
+    </layer>
+    <layer id="node">
+      <prefix></prefix>
+      <cost>0</cost>
+      <type>Leaf</type>
+    </layer>
+  </layers>
+  <topology>
+    <path>/datacenter/rack/node</path>
+    <enforceprefix>false</enforceprefix>
+  </topology>
+</configuration>
\ No newline at end of file
diff --git a/hadoop-hdds/common/src/test/resources/networkTopologyTestFiles/multiple-topology.xml b/hadoop-hdds/common/src/test/resources/networkTopologyTestFiles/multiple-topology.xml
new file mode 100644
index 0000000..a7322ca
--- /dev/null
+++ b/hadoop-hdds/common/src/test/resources/networkTopologyTestFiles/multiple-topology.xml
@@ -0,0 +1,47 @@
+<?xml version="1.0"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<configuration>
+  <layoutversion>1</layoutversion>
+  <layers>
+    <layer id="datacenter">
+      <prefix></prefix>
+      <cost>1</cost>
+      <type>ROOT</type>
+    </layer>
+    <layer id="rack">
+      <prefix>rack</prefix>
+      <cost>1</cost>
+      <type>InnerNode</type>
+      <default>default-rack</default>
+    </layer>
+    <layer id="node">
+      <prefix></prefix>
+      <cost>0</cost>
+      <type>Leaf</type>
+    </layer>
+  </layers>
+  <topology>
+    <path>/datacenter/rack/node</path>
+    <enforceprefix>false</enforceprefix>
+  </topology>
+  <topology>
+    <path>/datacenter/rack/node</path>
+    <enforceprefix>false</enforceprefix>
+  </topology>
+</configuration>
\ No newline at end of file
diff --git a/hadoop-hdds/common/src/test/resources/networkTopologyTestFiles/no-leaf.xml b/hadoop-hdds/common/src/test/resources/networkTopologyTestFiles/no-leaf.xml
new file mode 100644
index 0000000..fcc697c
--- /dev/null
+++ b/hadoop-hdds/common/src/test/resources/networkTopologyTestFiles/no-leaf.xml
@@ -0,0 +1,43 @@
+<?xml version="1.0"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<configuration>
+  <layoutversion>1</layoutversion>
+  <layers>
+    <layer id="datacenter">
+      <prefix></prefix>
+      <cost>1</cost>
+      <type>ROOT</type>
+    </layer>
+    <layer id="rack">
+      <prefix>rack</prefix>
+      <cost>1</cost>
+      <type>InnerNode</type>
+      <default>default-rack</default>
+    </layer>
+    <layer id="node">
+      <prefix></prefix>
+      <cost>0</cost>
+      <type>InnerNode</type>
+    </layer>
+  </layers>
+  <topology>
+    <path>/datacenter/rack/node</path>
+    <enforceprefix>false</enforceprefix>
+  </topology>
+</configuration>
\ No newline at end of file
diff --git a/hadoop-hdds/common/src/test/resources/networkTopologyTestFiles/no-root.xml b/hadoop-hdds/common/src/test/resources/networkTopologyTestFiles/no-root.xml
new file mode 100644
index 0000000..940696c
--- /dev/null
+++ b/hadoop-hdds/common/src/test/resources/networkTopologyTestFiles/no-root.xml
@@ -0,0 +1,43 @@
+<?xml version="1.0"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<configuration>
+  <layoutversion>1</layoutversion>
+  <layers>
+    <layer id="datacenter">
+      <prefix></prefix>
+      <cost>1</cost>
+      <type>InnerNode</type>
+    </layer>
+    <layer id="rack">
+      <prefix>rack</prefix>
+      <cost>1</cost>
+      <type>InnerNode</type>
+      <default>default-rack</default>
+    </layer>
+    <layer id="node">
+      <prefix></prefix>
+      <cost>0</cost>
+      <type>Leaf</type>
+    </layer>
+  </layers>
+  <topology>
+    <path>/datacenter/rack/node</path>
+    <enforceprefix>false</enforceprefix>
+  </topology>
+</configuration>
\ No newline at end of file
diff --git a/hadoop-hdds/common/src/test/resources/networkTopologyTestFiles/no-topology.xml b/hadoop-hdds/common/src/test/resources/networkTopologyTestFiles/no-topology.xml
new file mode 100644
index 0000000..c16e216
--- /dev/null
+++ b/hadoop-hdds/common/src/test/resources/networkTopologyTestFiles/no-topology.xml
@@ -0,0 +1,39 @@
+<?xml version="1.0"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<configuration>
+  <layoutversion>1</layoutversion>
+  <layers>
+    <layer id="datacenter">
+      <prefix></prefix>
+      <cost>1</cost>
+      <type>ROOT</type>
+    </layer>
+    <layer id="rack">
+      <prefix>rack</prefix>
+      <cost>1</cost>
+      <type>InnerNode</type>
+      <default>default-rack</default>
+    </layer>
+    <layer id="node">
+      <prefix></prefix>
+      <cost>0</cost>
+      <type>LEAF</type>
+    </layer>
+  </layers>
+</configuration>
\ No newline at end of file
diff --git a/hadoop-hdds/common/src/test/resources/networkTopologyTestFiles/path-layers-size-mismatch.xml b/hadoop-hdds/common/src/test/resources/networkTopologyTestFiles/path-layers-size-mismatch.xml
new file mode 100644
index 0000000..2c30219
--- /dev/null
+++ b/hadoop-hdds/common/src/test/resources/networkTopologyTestFiles/path-layers-size-mismatch.xml
@@ -0,0 +1,43 @@
+<?xml version="1.0"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<configuration>
+  <layoutversion>1</layoutversion>
+  <layers>
+    <layer id="datacenter">
+      <prefix></prefix>
+      <cost>1</cost>
+      <type>ROOT</type>
+    </layer>
+    <layer id="rack">
+      <prefix>rack</prefix>
+      <cost>1</cost>
+      <type>InnerNode</type>
+      <default>default-rack</default>
+    </layer>
+    <layer id="node">
+      <prefix></prefix>
+      <cost>0</cost>
+      <type>Leaf</type>
+    </layer>
+  </layers>
+  <topology>
+    <path>/datacenter/node</path>
+    <enforceprefix>false</enforceprefix>
+  </topology>
+</configuration>
\ No newline at end of file
diff --git a/hadoop-hdds/common/src/test/resources/networkTopologyTestFiles/path-with-id-reference-failure.xml b/hadoop-hdds/common/src/test/resources/networkTopologyTestFiles/path-with-id-reference-failure.xml
new file mode 100644
index 0000000..fac224b
--- /dev/null
+++ b/hadoop-hdds/common/src/test/resources/networkTopologyTestFiles/path-with-id-reference-failure.xml
@@ -0,0 +1,43 @@
+<?xml version="1.0"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<configuration>
+  <layoutversion>1</layoutversion>
+  <layers>
+    <layer id="datacenter">
+      <prefix></prefix>
+      <cost>1</cost>
+      <type>ROOT</type>
+    </layer>
+    <layer id="rack">
+      <prefix>rack</prefix>
+      <cost>1</cost>
+      <type>InnerNode</type>
+      <default>default-rack</default>
+    </layer>
+    <layer id="node">
+      <prefix></prefix>
+      <cost>0</cost>
+      <type>Leaf</type>
+    </layer>
+  </layers>
+  <topology>
+    <path>/datacenter/room/node</path>
+    <enforceprefix>false</enforceprefix>
+  </topology>
+</configuration>
\ No newline at end of file
diff --git a/hadoop-hdds/common/src/test/resources/networkTopologyTestFiles/unknown-layer-type.xml b/hadoop-hdds/common/src/test/resources/networkTopologyTestFiles/unknown-layer-type.xml
new file mode 100644
index 0000000..d228eec
--- /dev/null
+++ b/hadoop-hdds/common/src/test/resources/networkTopologyTestFiles/unknown-layer-type.xml
@@ -0,0 +1,43 @@
+<?xml version="1.0"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<configuration>
+  <layoutversion>1</layoutversion>
+  <layers>
+    <layer id="datacenter">
+      <prefix></prefix>
+      <cost>1</cost>
+      <type>ROOT</type>
+    </layer>
+    <layer id="rack">
+      <prefix>rack</prefix>
+      <cost>1</cost>
+      <type>InnerNode</type>
+      <default>default-rack</default>
+    </layer>
+    <layer id="node">
+      <prefix></prefix>
+      <cost>0</cost>
+      <type>leaves</type>
+    </layer>
+  </layers>
+  <topology>
+    <path>/datacenter/rack/node</path>
+    <enforceprefix>false</enforceprefix>
+  </topology>
+</configuration>
\ No newline at end of file
diff --git a/hadoop-hdds/common/src/test/resources/networkTopologyTestFiles/wrong-path-order-1.xml b/hadoop-hdds/common/src/test/resources/networkTopologyTestFiles/wrong-path-order-1.xml
new file mode 100644
index 0000000..221e10b
--- /dev/null
+++ b/hadoop-hdds/common/src/test/resources/networkTopologyTestFiles/wrong-path-order-1.xml
@@ -0,0 +1,43 @@
+<?xml version="1.0"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<configuration>
+  <layoutversion>1</layoutversion>
+  <layers>
+    <layer id="datacenter">
+      <prefix></prefix>
+      <cost>1</cost>
+      <type>ROOT</type>
+    </layer>
+    <layer id="rack">
+      <prefix>rack</prefix>
+      <cost>1</cost>
+      <type>InnerNode</type>
+      <default>default-rack</default>
+    </layer>
+    <layer id="node">
+      <prefix></prefix>
+      <cost>0</cost>
+      <type>Leaf</type>
+    </layer>
+  </layers>
+  <topology>
+    <path>/rack/datacenter/node</path>
+    <enforceprefix>false</enforceprefix>
+  </topology>
+</configuration>
\ No newline at end of file
diff --git a/hadoop-hdds/common/src/test/resources/networkTopologyTestFiles/wrong-path-order-2.xml b/hadoop-hdds/common/src/test/resources/networkTopologyTestFiles/wrong-path-order-2.xml
new file mode 100644
index 0000000..51e579e
--- /dev/null
+++ b/hadoop-hdds/common/src/test/resources/networkTopologyTestFiles/wrong-path-order-2.xml
@@ -0,0 +1,43 @@
+<?xml version="1.0"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<configuration>
+  <layoutversion>1</layoutversion>
+  <layers>
+    <layer id="datacenter">
+      <prefix></prefix>
+      <cost>1</cost>
+      <type>ROOT</type>
+    </layer>
+    <layer id="rack">
+      <prefix>rack</prefix>
+      <cost>1</cost>
+      <type>InnerNode</type>
+      <default>default-rack</default>
+    </layer>
+    <layer id="node">
+      <prefix></prefix>
+      <cost>0</cost>
+      <type>Leaf</type>
+    </layer>
+  </layers>
+  <topology>
+    <path>/datacenter/node/rack</path>
+    <enforceprefix>false</enforceprefix>
+  </topology>
+</configuration>
\ No newline at end of file
diff --git a/hadoop-ozone/dist/dev-support/bin/dist-layout-stitching b/hadoop-ozone/dist/dev-support/bin/dist-layout-stitching
index 9a470d5..229bed8 100755
--- a/hadoop-ozone/dist/dev-support/bin/dist-layout-stitching
+++ b/hadoop-ozone/dist/dev-support/bin/dist-layout-stitching
@@ -90,6 +90,8 @@ run cp "${ROOT}/hadoop-ozone/dist/src/main/conf/dn-audit-log4j2.properties" "etc
 run cp "${ROOT}/hadoop-ozone/dist/src/main/conf/scm-audit-log4j2.properties" "etc/hadoop"
 run cp "${ROOT}/hadoop-ozone/dist/src/main/conf/ozone-site.xml" "etc/hadoop"
 run cp -f "${ROOT}/hadoop-ozone/dist/src/main/conf/log4j.properties" "etc/hadoop"
+run cp "${ROOT}/hadoop-hdds/common/src/main/resources/network-topology-default.xml" "etc/hadoop"
+run cp "${ROOT}/hadoop-hdds/common/src/main/resources/network-topology-nodegroup.xml" "etc/hadoop"
 run cp "${ROOT}/hadoop-common-project/hadoop-common/src/main/bin/hadoop" "bin/"
 run cp "${ROOT}/hadoop-common-project/hadoop-common/src/main/bin/hadoop.cmd" "bin/"
 run cp "${ROOT}/hadoop-ozone/common/src/main/bin/ozone" "bin/"


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org