You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by we...@apache.org on 2019/11/15 20:17:39 UTC

[hadoop] branch trunk updated: HDFS-14882. Consider DataNode load when #getBlockLocation. Contributed by Xiaoqiao He.

This is an automated email from the ASF dual-hosted git repository.

weichiu pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c892a87  HDFS-14882. Consider DataNode load when #getBlockLocation. Contributed by Xiaoqiao He.
c892a87 is described below

commit c892a879ddce3abfd51c8609c81148bf6e4f9daa
Author: He Xiaoqiao <he...@apache.org>
AuthorDate: Fri Nov 15 12:15:33 2019 -0800

    HDFS-14882. Consider DataNode load when #getBlockLocation. Contributed by Xiaoqiao He.
    
    Signed-off-by: Wei-Chiu Chuang <we...@apache.org>
    
    Reviewed-by: Inigo Goiri <in...@apache.org>
    Reviewed-by: Istvan Fajth <pi...@cloudera.com>
---
 .../org/apache/hadoop/net/NetworkTopology.java     | 68 ++++++++++++++----
 .../java/org/apache/hadoop/hdfs/DFSConfigKeys.java |  4 ++
 .../server/blockmanagement/DatanodeManager.java    | 24 ++++++-
 .../src/main/resources/hdfs-default.xml            | 13 +++-
 .../blockmanagement/TestDatanodeManager.java       | 82 ++++++++++++++++++++++
 .../org/apache/hadoop/net/TestNetworkTopology.java | 34 ++++++---
 6 files changed, 199 insertions(+), 26 deletions(-)

diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java
index 724cec3..66799f5 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java
@@ -31,6 +31,7 @@ import org.slf4j.LoggerFactory;
 import java.util.*;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Consumer;
 
 /** The class represents a cluster of computer with a tree hierarchical
  * network topology.
@@ -874,11 +875,33 @@ public class NetworkTopology {
      * This method is called if the reader is a datanode,
      * so nonDataNodeReader flag is set to false.
      */
-    sortByDistance(reader, nodes, activeLen, false);
+    sortByDistance(reader, nodes, activeLen, list -> Collections.shuffle(list));
   }
 
   /**
-   * Sort nodes array by network distance to <i>reader</i>.
+   * Sort nodes array by network distance to <i>reader</i> with secondary sort.
+   * <p>
+   * In a three-level topology, a node can be either local, on the same rack,
+   * or on a different rack from the reader. Sorting the nodes based on network
+   * distance from the reader reduces network traffic and improves
+   * performance.
+   * <p>
+   * As an additional twist, we also randomize the nodes at each network
+   * distance. This helps with load balancing when there is data skew.
+   *
+   * @param reader    Node where data will be read
+   * @param nodes     Available replicas with the requested data
+   * @param activeLen Number of active nodes at the front of the array
+   * @param secondarySort a secondary sorting strategy which can inject into
+   *     that point from outside to help sort the same distance.
+   */
+  public <T extends Node> void sortByDistance(Node reader, T[] nodes,
+      int activeLen, Consumer<List<T>> secondarySort){
+    sortByDistance(reader, nodes, activeLen, secondarySort, false);
+  }
+
+  /**
+   * Sort nodes array by network distance to <i>reader</i> with secondary sort.
    * <p> using network location. This is used when the reader
    * is not a datanode. Sorting the nodes based on network distance
    * from the reader reduces network traffic and improves
@@ -895,7 +918,27 @@ public class NetworkTopology {
      * This method is called if the reader is not a datanode,
      * so nonDataNodeReader flag is set to true.
      */
-    sortByDistance(reader, nodes, activeLen, true);
+    sortByDistanceUsingNetworkLocation(reader, nodes, activeLen,
+        list -> Collections.shuffle(list));
+  }
+
+  /**
+   * Sort nodes array by network distance to <i>reader</i>.
+   * <p> using network location. This is used when the reader
+   * is not a datanode. Sorting the nodes based on network distance
+   * from the reader reduces network traffic and improves
+   * performance.
+   * <p>
+   *
+   * @param reader    Node where data will be read
+   * @param nodes     Available replicas with the requested data
+   * @param activeLen Number of active nodes at the front of the array
+   * @param secondarySort a secondary sorting strategy which can inject into
+   *     that point from outside to help sort the same distance.
+   */
+  public <T extends Node> void sortByDistanceUsingNetworkLocation(Node reader,
+      T[] nodes, int activeLen, Consumer<List<T>> secondarySort) {
+    sortByDistance(reader, nodes, activeLen, secondarySort, true);
   }
 
   /**
@@ -909,7 +952,8 @@ public class NetworkTopology {
    * @param activeLen Number of active nodes at the front of the array
    * @param nonDataNodeReader True if the reader is not a datanode
    */
-  private void sortByDistance(Node reader, Node[] nodes, int activeLen,
+  private <T extends Node> void sortByDistance(Node reader, T[] nodes,
+      int activeLen, Consumer<List<T>> secondarySort,
       boolean nonDataNodeReader) {
     /** Sort weights for the nodes array */
     int[] weights = new int[activeLen];
@@ -921,23 +965,23 @@ public class NetworkTopology {
       }
     }
     // Add weight/node pairs to a TreeMap to sort
-    TreeMap<Integer, List<Node>> tree = new TreeMap<Integer, List<Node>>();
+    TreeMap<Integer, List<T>> tree = new TreeMap<>();
     for (int i=0; i<activeLen; i++) {
       int weight = weights[i];
-      Node node = nodes[i];
-      List<Node> list = tree.get(weight);
+      T node = nodes[i];
+      List<T> list = tree.get(weight);
       if (list == null) {
         list = Lists.newArrayListWithExpectedSize(1);
         tree.put(weight, list);
       }
       list.add(node);
     }
-
+    // Sort nodes which have the same weight using secondarySort.
     int idx = 0;
-    for (List<Node> list: tree.values()) {
+    for (List<T> list: tree.values()) {
       if (list != null) {
-        Collections.shuffle(list, r);
-        for (Node n: list) {
+        secondarySort.accept(list);
+        for (T n: list) {
           nodes[idx] = n;
           idx++;
         }
@@ -946,4 +990,4 @@ public class NetworkTopology {
     Preconditions.checkState(idx == activeLen,
         "Sorted the wrong number of nodes!");
   }
-}
+}
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index a2df317..1c3a71f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -230,6 +230,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
       HdfsClientConfigKeys.DeprecatedKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOAD_KEY;
   public static final boolean DFS_NAMENODE_REDUNDANCY_CONSIDERLOAD_DEFAULT =
       true;
+  public static final String  DFS_NAMENODE_READ_CONSIDERLOAD_KEY =
+      "dfs.namenode.read.considerLoad";
+  public static final boolean DFS_NAMENODE_READ_CONSIDERLOAD_DEFAULT =
+      false;
   public static final String  DFS_NAMENODE_REDUNDANCY_CONSIDERLOAD_FACTOR =
       "dfs.namenode.redundancy.considerLoad.factor";
   public static final double
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
index 049c949..8adb03d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
@@ -30,7 +30,6 @@ import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
@@ -66,6 +65,7 @@ import java.net.UnknownHostException;
 import java.util.*;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
 
 /**
  * Manage datanodes, include decommission and other activities.
@@ -134,6 +134,9 @@ public class DatanodeManager {
   /** Whether or not to avoid using stale DataNodes for reading */
   private final boolean avoidStaleDataNodesForRead;
 
+  /** Whether or not to consider lad for reading. */
+  private final boolean readConsiderLoad;
+
   /**
    * Whether or not to avoid using stale DataNodes for writing.
    * Note that, even if this is configured, the policy may be
@@ -314,6 +317,9 @@ public class DatanodeManager {
     this.avoidStaleDataNodesForRead = conf.getBoolean(
         DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_KEY,
         DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_DEFAULT);
+    this.readConsiderLoad = conf.getBoolean(
+        DFSConfigKeys.DFS_NAMENODE_READ_CONSIDERLOAD_KEY,
+        DFSConfigKeys.DFS_NAMENODE_READ_CONSIDERLOAD_DEFAULT);
     this.avoidStaleDataNodesForWrite = conf.getBoolean(
         DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY,
         DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_DEFAULT);
@@ -530,9 +536,10 @@ public class DatanodeManager {
     int activeLen = lastActiveIndex + 1;
     if(nonDatanodeReader) {
       networktopology.sortByDistanceUsingNetworkLocation(client,
-          lb.getLocations(), activeLen);
+          lb.getLocations(), activeLen, createSecondaryNodeSorter());
     } else {
-      networktopology.sortByDistance(client, lb.getLocations(), activeLen);
+      networktopology.sortByDistance(client, lb.getLocations(), activeLen,
+          createSecondaryNodeSorter());
     }
     // move PROVIDED storage to the end to prefer local replicas.
     lb.moveProvidedToEnd(activeLen);
@@ -540,6 +547,17 @@ public class DatanodeManager {
     lb.updateCachedStorageInfo();
   }
 
+  private Consumer<List<DatanodeInfo>> createSecondaryNodeSorter() {
+    Consumer<List<DatanodeInfo>> secondarySort =
+        list -> Collections.shuffle(list);
+    if (readConsiderLoad) {
+      Comparator<DatanodeInfo> comp =
+          Comparator.comparingInt(DatanodeInfo::getXceiverCount);
+      secondarySort = list -> Collections.sort(list, comp);
+    }
+    return secondarySort;
+  }
+
   /** @return the datanode descriptor for the host. */
   public DatanodeDescriptor getDatanodeByHost(final String host) {
     return host2DatanodeMap.getDatanodeByHost(host);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 98b91a6..7effbd0 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -307,7 +307,9 @@
 <property>
   <name>dfs.namenode.redundancy.considerLoad</name>
   <value>true</value>
-  <description>Decide if chooseTarget considers the target's load or not
+  <description>
+    Decide if chooseTarget considers the target's load or not when write.
+    Turn on by default.
   </description>
 </property>
 
@@ -320,6 +322,15 @@
   </property>
 
 <property>
+  <name>dfs.namenode.read.considerLoad</name>
+  <value>false</value>
+  <description>
+    Decide if sort block locations considers the target's load or not when read.
+    Turn off by default.
+  </description>
+</property>
+
+<property>
   <name>dfs.datanode.httpserver.filter.handlers</name>
   <value>org.apache.hadoop.hdfs.server.datanode.web.RestCsrfPreventionFilterHandler</value>
   <description>Comma separated list of Netty servlet-style filter handlers to inject into the Datanode WebHDFS I/O path
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java
index 210e434..e8e6b94 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java
@@ -518,6 +518,88 @@ public class TestDatanodeManager {
     assertEquals(locs[4].getIpAddr(), sortedLocs2[0].getIpAddr());
   }
 
+  @Test
+  public void testGetBlockLocationConsiderLoad()
+      throws IOException, URISyntaxException {
+    Configuration conf = new Configuration();
+    conf.setBoolean(
+        DFSConfigKeys.DFS_NAMENODE_READ_CONSIDERLOAD_KEY, true);
+    conf.setBoolean(
+        DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_KEY, true);
+    FSNamesystem fsn = Mockito.mock(FSNamesystem.class);
+    Mockito.when(fsn.hasWriteLock()).thenReturn(true);
+    URL shellScript = getClass().getResource(
+        "/" + Shell.appendScriptExtension("topology-script"));
+    Path resourcePath = Paths.get(shellScript.toURI());
+    FileUtil.setExecutable(resourcePath.toFile(), true);
+    conf.set(DFSConfigKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY,
+        resourcePath.toString());
+    DatanodeManager dm = mockDatanodeManager(fsn, conf);
+
+    int totalDNs = 5;
+    // Register 5 datanodes and 2 nodes per rack with different load.
+    DatanodeInfo[] locs = new DatanodeInfo[totalDNs];
+    String[] storageIDs = new String[totalDNs];
+    for (int i = 0; i < totalDNs; i++) {
+      // Register new datanode.
+      String uuid = "UUID-" + i;
+      String ip = "IP-" + i / 2 + "-" + i;
+      DatanodeRegistration dr = Mockito.mock(DatanodeRegistration.class);
+      Mockito.when(dr.getDatanodeUuid()).thenReturn(uuid);
+      Mockito.when(dr.getIpAddr()).thenReturn(ip);
+      dm.registerDatanode(dr);
+
+      // Get location and storage information.
+      locs[i] = dm.getDatanode(uuid);
+      storageIDs[i] = "storageID-" + i;
+
+      // Set load for datanodes.
+      locs[i].setXceiverCount(i);
+    }
+
+    // Set node 0 decommissioned.
+    locs[0].setDecommissioned();
+
+    // Create LocatedBlock with above locations.
+    ExtendedBlock b = new ExtendedBlock("somePoolID", 1234);
+    LocatedBlock block = new LocatedBlock(b, locs);
+    List<LocatedBlock> blocks = new ArrayList<>();
+    blocks.add(block);
+
+    // Test client located at locs[3] in cluster.
+    final String targetIpInCluster = locs[3].getIpAddr();
+    dm.sortLocatedBlocks(targetIpInCluster, blocks);
+    DatanodeInfo[] sortedLocs = block.getLocations();
+    assertEquals(totalDNs, sortedLocs.length);
+    // Ensure the local node is first.
+    assertEquals(targetIpInCluster, sortedLocs[0].getIpAddr());
+    // Ensure the lightweight node is more close when distance is same.
+    assertEquals(locs[3].getIpAddr(), sortedLocs[0].getIpAddr());
+    assertEquals(locs[2].getIpAddr(), sortedLocs[1].getIpAddr());
+    assertEquals(locs[1].getIpAddr(), sortedLocs[2].getIpAddr());
+    assertEquals(locs[4].getIpAddr(), sortedLocs[3].getIpAddr());
+    // Ensure the two decommissioned DNs were moved to the end.
+    assertThat(sortedLocs[4].getAdminState(),
+        is(DatanodeInfo.AdminStates.DECOMMISSIONED));
+    assertEquals(locs[0].getIpAddr(), sortedLocs[4].getIpAddr());
+
+    // Test client not in cluster but same rack with locs[3].
+    final String targetIpNotInCluster = locs[3].getIpAddr() + "-client";
+    dm.sortLocatedBlocks(targetIpNotInCluster, blocks);
+    DatanodeInfo[] sortedLocs2 = block.getLocations();
+    assertEquals(totalDNs, sortedLocs2.length);
+    // Ensure the local rack is first and lightweight node is first
+    // when distance is same.
+    assertEquals(locs[2].getIpAddr(), sortedLocs2[0].getIpAddr());
+    assertEquals(locs[3].getIpAddr(), sortedLocs2[1].getIpAddr());
+    assertEquals(locs[1].getIpAddr(), sortedLocs2[2].getIpAddr());
+    assertEquals(locs[4].getIpAddr(), sortedLocs2[3].getIpAddr());
+    // Ensure the two decommissioned DNs were moved to the end.
+    assertThat(sortedLocs[4].getAdminState(),
+        is(DatanodeInfo.AdminStates.DECOMMISSIONED));
+    assertEquals(locs[0].getIpAddr(), sortedLocs2[4].getIpAddr());
+  }
+
   /**
    * Test whether removing a host from the includes list without adding it to
    * the excludes list will exclude it from data node reports.
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/net/TestNetworkTopology.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/net/TestNetworkTopology.java
index 9466a75..f16bfb7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/net/TestNetworkTopology.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/net/TestNetworkTopology.java
@@ -241,10 +241,16 @@ public class TestNetworkTopology {
     cluster.setRandomSeed(0xDEADBEEF);
     cluster.sortByDistance(dataNodes[8], dtestNodes, dtestNodes.length - 2);
     assertTrue(dtestNodes[0] == dataNodes[8]);
-    assertTrue(dtestNodes[1] == dataNodes[11]);
-    assertTrue(dtestNodes[2] == dataNodes[12]);
-    assertTrue(dtestNodes[3] == dataNodes[9]);
-    assertTrue(dtestNodes[4] == dataNodes[10]);
+    assertTrue(dtestNodes[1] != dtestNodes[2]);
+    assertTrue(dtestNodes[1] == dataNodes[11]
+        || dtestNodes[1] == dataNodes[12]);
+    assertTrue(dtestNodes[2] == dataNodes[11]
+        || dtestNodes[2] == dataNodes[12]);
+    assertTrue(dtestNodes[3] != dtestNodes[4]);
+    assertTrue(dtestNodes[3] == dataNodes[9]
+        || dtestNodes[3] == dataNodes[10]);
+    assertTrue(dtestNodes[4] == dataNodes[9]
+        || dtestNodes[4] == dataNodes[10]);
 
     // array contains local node
     testNodes[0] = dataNodes[1];
@@ -331,10 +337,14 @@ public class TestNetworkTopology {
     testNodes[2] = dataNodes[8];
     Node rackClient = new NodeBase("/d3/r1/25.25.25");
     cluster.setRandomSeed(0xDEADBEEF);
-    cluster.sortByDistance(rackClient, testNodes, testNodes.length);
+    cluster.sortByDistanceUsingNetworkLocation(rackClient, testNodes,
+        testNodes.length);
     assertTrue(testNodes[0] == dataNodes[8]);
-    assertTrue(testNodes[1] == dataNodes[5]);
-    assertTrue(testNodes[2] == dataNodes[0]);
+    assertTrue(testNodes[1] != testNodes[2]);
+    assertTrue(testNodes[1] == dataNodes[0]
+        || testNodes[1] == dataNodes[5]);
+    assertTrue(testNodes[2] == dataNodes[0]
+        || testNodes[2] == dataNodes[5]);
 
     //Reader is not a datanode , but is in one of the datanode's data center.
     testNodes[0] = dataNodes[8];
@@ -342,10 +352,14 @@ public class TestNetworkTopology {
     testNodes[2] = dataNodes[0];
     Node dcClient = new NodeBase("/d1/r2/25.25.25");
     cluster.setRandomSeed(0xDEADBEEF);
-    cluster.sortByDistance(dcClient, testNodes, testNodes.length);
+    cluster.sortByDistanceUsingNetworkLocation(dcClient, testNodes,
+        testNodes.length);
     assertTrue(testNodes[0] == dataNodes[0]);
-    assertTrue(testNodes[1] == dataNodes[5]);
-    assertTrue(testNodes[2] == dataNodes[8]);
+    assertTrue(testNodes[1] != testNodes[2]);
+    assertTrue(testNodes[1] == dataNodes[5]
+        || testNodes[1] == dataNodes[8]);
+    assertTrue(testNodes[2] == dataNodes[5]
+        || testNodes[2] == dataNodes[8]);
 
   }
   


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org