You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by as...@apache.org on 2016/12/09 15:51:37 UTC

[11/33] hadoop git commit: HDFS-10206. Datanodes not sorted properly by distance when the reader isn't a datanode. (Nandakumar via mingma)

HDFS-10206. Datanodes not sorted properly by distance when the reader isn't a datanode. (Nandakumar via mingma)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c73e08a6
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c73e08a6
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c73e08a6

Branch: refs/heads/YARN-5972
Commit: c73e08a6dad46cad14b38a4a586a5cda1622b206
Parents: 563480d
Author: Ming Ma <mi...@apache.org>
Authored: Wed Dec 7 08:26:09 2016 -0800
Committer: Ming Ma <mi...@apache.org>
Committed: Wed Dec 7 08:26:09 2016 -0800

----------------------------------------------------------------------
 .../org/apache/hadoop/net/NetworkTopology.java  | 158 +++++++++++++++++--
 .../server/blockmanagement/DatanodeManager.java |  12 +-
 .../apache/hadoop/net/TestNetworkTopology.java  |  29 +++-
 3 files changed, 182 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/c73e08a6/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java
index 14c870d..5751d2b 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java
@@ -57,6 +57,10 @@ public class NetworkTopology {
   public static final Logger LOG =
       LoggerFactory.getLogger(NetworkTopology.class);
 
+  private static final char PATH_SEPARATOR = '/';
+  private static final String PATH_SEPARATOR_STR = "/";
+  private static final String ROOT = "/";
+
   public static class InvalidTopologyException extends RuntimeException {
     private static final long serialVersionUID = 1L;
     public InvalidTopologyException(String msg) {
@@ -916,7 +920,7 @@ public class NetworkTopology {
     }
   }
 
-  /** convert a network tree to a string */
+  /** convert a network tree to a string. */
   @Override
   public String toString() {
     // print the number of racks
@@ -970,19 +974,108 @@ public class NetworkTopology {
    * @return weight
    */
   protected int getWeight(Node reader, Node node) {
-    // 0 is local, 1 is same rack, 2 is off rack
-    // Start off by initializing to off rack
-    int weight = 2;
-    if (reader != null) {
-      if (reader.equals(node)) {
-        weight = 0;
-      } else if (isOnSameRack(reader, node)) {
-        weight = 1;
+    // 0 is local, 2 is same rack, and each level on each node increases the
+    //weight by 1
+    //Start off by initializing to Integer.MAX_VALUE
+    int weight = Integer.MAX_VALUE;
+    if (reader != null && node != null) {
+      if(reader.equals(node)) {
+        return 0;
+      }
+      int maxReaderLevel = reader.getLevel();
+      int maxNodeLevel = node.getLevel();
+      int currentLevelToCompare = maxReaderLevel > maxNodeLevel ?
+          maxNodeLevel : maxReaderLevel;
+      Node r = reader;
+      Node n = node;
+      weight = 0;
+      while(r != null && r.getLevel() > currentLevelToCompare) {
+        r = r.getParent();
+        weight++;
+      }
+      while(n != null && n.getLevel() > currentLevelToCompare) {
+        n = n.getParent();
+        weight++;
+      }
+      while(r != null && n != null && !r.equals(n)) {
+        r = r.getParent();
+        n = n.getParent();
+        weight+=2;
+      }
+    }
+    return weight;
+  }
+
+  /**
+   * Returns an integer weight which specifies how far away <i>node</i> is
+   * from <i>reader</i>. A lower value signifies that a node is closer.
+   * It uses network location to calculate the weight
+   *
+   * @param reader Node where data will be read
+   * @param node Replica of data
+   * @return weight
+   */
+  private static int getWeightUsingNetworkLocation(Node reader, Node node) {
+    //Start off by initializing to Integer.MAX_VALUE
+    int weight = Integer.MAX_VALUE;
+    if(reader != null && node != null) {
+      String readerPath = normalizeNetworkLocationPath(
+          reader.getNetworkLocation());
+      String nodePath = normalizeNetworkLocationPath(
+          node.getNetworkLocation());
+
+      //same rack
+      if(readerPath.equals(nodePath)) {
+        if(reader.getName().equals(node.getName())) {
+          weight = 0;
+        } else {
+          weight = 2;
+        }
+      } else {
+        String[] readerPathToken = readerPath.split(PATH_SEPARATOR_STR);
+        String[] nodePathToken = nodePath.split(PATH_SEPARATOR_STR);
+        int maxLevelToCompare = readerPathToken.length > nodePathToken.length ?
+            nodePathToken.length : readerPathToken.length;
+        int currentLevel = 1;
+        //traverse through the path and calculate the distance
+        while(currentLevel < maxLevelToCompare) {
+          if(!readerPathToken[currentLevel]
+              .equals(nodePathToken[currentLevel])){
+            break;
+          }
+          currentLevel++;
+        }
+        weight = (readerPathToken.length - currentLevel) +
+            (nodePathToken.length - currentLevel);
       }
     }
     return weight;
   }
 
+  /** Normalize a path by stripping off any trailing {@link #PATH_SEPARATOR}.
+   * @param path path to normalize.
+   * @return the normalised path
+   * If <i>path</i>is null or empty {@link #ROOT} is returned
+   * @throws IllegalArgumentException if the first character of a non empty path
+   * is not {@link #PATH_SEPARATOR}
+   */
+  private static String normalizeNetworkLocationPath(String path) {
+    if (path == null || path.length() == 0) {
+      return ROOT;
+    }
+
+    if (path.charAt(0) != PATH_SEPARATOR) {
+      throw new IllegalArgumentException("Network Location"
+          + "path doesn't start with " +PATH_SEPARATOR+ ": "+path);
+    }
+
+    int len = path.length();
+    if (path.charAt(len-1) == PATH_SEPARATOR) {
+      return path.substring(0, len-1);
+    }
+    return path;
+  }
+
   /**
    * Sort nodes array by network distance to <i>reader</i>.
    * <p/>
@@ -999,10 +1092,55 @@ public class NetworkTopology {
    * @param activeLen Number of active nodes at the front of the array
    */
   public void sortByDistance(Node reader, Node[] nodes, int activeLen) {
+    /*
+     * This method is called if the reader is a datanode,
+     * so nonDataNodeReader flag is set to false.
+     */
+    sortByDistance(reader, nodes, activeLen, false);
+  }
+
+  /**
+   * Sort nodes array by network distance to <i>reader</i>.
+   * <p/> using network location. This is used when the reader
+   * is not a datanode. Sorting the nodes based on network distance
+   * from the reader reduces network traffic and improves
+   * performance.
+   * <p/>
+   *
+   * @param reader    Node where data will be read
+   * @param nodes     Available replicas with the requested data
+   * @param activeLen Number of active nodes at the front of the array
+   */
+  public void sortByDistanceUsingNetworkLocation(Node reader, Node[] nodes,
+      int activeLen) {
+    /*
+     * This method is called if the reader is not a datanode,
+     * so nonDataNodeReader flag is set to true.
+     */
+    sortByDistance(reader, nodes, activeLen, true);
+  }
+
+  /**
+   * Sort nodes array by network distance to <i>reader</i>.
+   * <p/>
+   * As an additional twist, we also randomize the nodes at each network
+   * distance. This helps with load balancing when there is data skew.
+   *
+   * @param reader    Node where data will be read
+   * @param nodes     Available replicas with the requested data
+   * @param activeLen Number of active nodes at the front of the array
+   * @param nonDataNodeReader True if the reader is not a datanode
+   */
+  private void sortByDistance(Node reader, Node[] nodes, int activeLen,
+      boolean nonDataNodeReader) {
     /** Sort weights for the nodes array */
     int[] weights = new int[activeLen];
     for (int i=0; i<activeLen; i++) {
-      weights[i] = getWeight(reader, nodes[i]);
+      if(nonDataNodeReader) {
+        weights[i] = getWeightUsingNetworkLocation(reader, nodes[i]);
+      } else {
+        weights[i] = getWeight(reader, nodes[i]);
+      }
     }
     // Add weight/node pairs to a TreeMap to sort
     TreeMap<Integer, List<Node>> tree = new TreeMap<Integer, List<Node>>();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c73e08a6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
index 47f15c4..6477d5c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
@@ -443,9 +443,11 @@ public class DatanodeManager {
       Comparator<DatanodeInfo> comparator) {
     // As it is possible for the separation of node manager and datanode, 
     // here we should get node but not datanode only .
+    boolean nonDatanodeReader = false;
     Node client = getDatanodeByHost(targetHost);
     if (client == null) {
-      List<String> hosts = new ArrayList<> (1);
+      nonDatanodeReader = true;
+      List<String> hosts = new ArrayList<>(1);
       hosts.add(targetHost);
       List<String> resolvedHosts = dnsToSwitchMapping.resolve(hosts);
       if (resolvedHosts != null && !resolvedHosts.isEmpty()) {
@@ -470,8 +472,12 @@ public class DatanodeManager {
       --lastActiveIndex;
     }
     int activeLen = lastActiveIndex + 1;
-    networktopology.sortByDistance(client, lb.getLocations(), activeLen);
-
+    if(nonDatanodeReader) {
+      networktopology.sortByDistanceUsingNetworkLocation(client,
+          lb.getLocations(), activeLen);
+    } else {
+      networktopology.sortByDistance(client, lb.getLocations(), activeLen);
+    }
     // must update cache since we modified locations array
     lb.updateCachedStorageInfo();
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c73e08a6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/net/TestNetworkTopology.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/net/TestNetworkTopology.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/net/TestNetworkTopology.java
index d149f65..3a281fc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/net/TestNetworkTopology.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/net/TestNetworkTopology.java
@@ -220,11 +220,9 @@ public class TestNetworkTopology {
     testNodes[2] = dataNodes[3];
     cluster.setRandomSeed(0xDEAD);
     cluster.sortByDistance(dataNodes[0], testNodes, testNodes.length);
-    // sortByDistance does not take the "data center" layer into consideration
-    // and it doesn't sort by getDistance, so 1, 5, 3 is also valid here
     assertTrue(testNodes[0] == dataNodes[1]);
-    assertTrue(testNodes[1] == dataNodes[5]);
-    assertTrue(testNodes[2] == dataNodes[3]);
+    assertTrue(testNodes[1] == dataNodes[3]);
+    assertTrue(testNodes[2] == dataNodes[5]);
 
     // Array of just rack-local nodes
     // Expect a random first node
@@ -264,6 +262,29 @@ public class TestNetworkTopology {
       }
     }
     assertTrue("Expected to find a different first location", foundRandom);
+
+    //Reader is not a datanode, but is in one of the datanode's rack.
+    testNodes[0] = dataNodes[0];
+    testNodes[1] = dataNodes[5];
+    testNodes[2] = dataNodes[8];
+    Node rackClient = new NodeBase("/d3/r1/25.25.25");
+    cluster.setRandomSeed(0xDEADBEEF);
+    cluster.sortByDistance(rackClient, testNodes, testNodes.length);
+    assertTrue(testNodes[0] == dataNodes[8]);
+    assertTrue(testNodes[1] == dataNodes[5]);
+    assertTrue(testNodes[2] == dataNodes[0]);
+
+    //Reader is not a datanode , but is in one of the datanode's data center.
+    testNodes[0] = dataNodes[8];
+    testNodes[1] = dataNodes[5];
+    testNodes[2] = dataNodes[0];
+    Node dcClient = new NodeBase("/d1/r2/25.25.25");
+    cluster.setRandomSeed(0xDEADBEEF);
+    cluster.sortByDistance(dcClient, testNodes, testNodes.length);
+    assertTrue(testNodes[0] == dataNodes[0]);
+    assertTrue(testNodes[1] == dataNodes[5]);
+    assertTrue(testNodes[2] == dataNodes[8]);
+
   }
   
   @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org