You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by we...@apache.org on 2021/03/31 03:26:22 UTC

[hadoop] 01/07: HDFS-14383. Compute datanode load based on StoragePolicy. Contributed by Ayush Saxena.

This is an automated email from the ASF dual-hosted git repository.

weichiu pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit 7f20fad41912217c03cdd7f2d54c01e40f49d051
Author: Ayush Saxena <ay...@apache.org>
AuthorDate: Mon Oct 19 10:48:47 2020 +0530

    HDFS-14383. Compute datanode load based on StoragePolicy. Contributed by Ayush Saxena.
    
    (cherry picked from commit 2e8cafac3b071fe5b943542827fd8a496b137fa9)
---
 .../java/org/apache/hadoop/hdfs/DFSConfigKeys.java |  5 ++
 .../BlockPlacementPolicyDefault.java               | 55 ++++++++++++++++++++--
 .../server/blockmanagement/DatanodeManager.java    |  6 +++
 .../server/blockmanagement/FSClusterStats.java     |  9 ++++
 .../server/blockmanagement/StorageTypeStats.java   | 16 +++++++
 .../src/main/resources/hdfs-default.xml            | 11 +++++
 .../blockmanagement/TestBlockStatsMXBean.java      | 52 ++++++++++++++++++++
 .../blockmanagement/TestReplicationPolicy.java     | 29 ++++++++++++
 8 files changed, 180 insertions(+), 3 deletions(-)

diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index f7da4cb..75b7389 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -239,6 +239,11 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
       HdfsClientConfigKeys.DeprecatedKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOAD_KEY;
   public static final boolean DFS_NAMENODE_REDUNDANCY_CONSIDERLOAD_DEFAULT =
       true;
+  public static final String
+      DFS_NAMENODE_REDUNDANCY_CONSIDERLOADBYSTORAGETYPE_KEY =
+      "dfs.namenode.redundancy.considerLoadByStorageType";
+  public static final boolean
+      DFS_NAMENODE_REDUNDANCY_CONSIDERLOADBYSTORAGETYPE_DEFAULT = false;
   public static final String  DFS_NAMENODE_READ_CONSIDERLOAD_KEY =
       "dfs.namenode.read.considerLoad";
   public static final boolean DFS_NAMENODE_READ_CONSIDERLOAD_DEFAULT =
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
index a1a83b0..5761690 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hdfs.server.blockmanagement;
 
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOADBYSTORAGETYPE_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOADBYSTORAGETYPE_KEY;
 import static org.apache.hadoop.util.Time.monotonicNow;
 
 import java.util.*;
@@ -92,7 +94,8 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
     }
   }
 
-  protected boolean considerLoad; 
+  protected boolean considerLoad;
+  private boolean considerLoadByStorageType;
   protected double considerLoadFactor;
   private boolean preferLocalNode;
   protected NetworkTopology clusterMap;
@@ -116,6 +119,9 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
     this.considerLoad = conf.getBoolean(
         DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOAD_KEY,
         DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOAD_DEFAULT);
+    this.considerLoadByStorageType = conf.getBoolean(
+        DFS_NAMENODE_REDUNDANCY_CONSIDERLOADBYSTORAGETYPE_KEY,
+        DFS_NAMENODE_REDUNDANCY_CONSIDERLOADBYSTORAGETYPE_DEFAULT);
     this.considerLoadFactor = conf.getDouble(
         DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOAD_FACTOR,
         DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOAD_FACTOR_DEFAULT);
@@ -976,8 +982,9 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
    * @return Return true if the datanode should be excluded, otherwise false
    */
   boolean excludeNodeByLoad(DatanodeDescriptor node){
-    final double maxLoad = considerLoadFactor *
-        stats.getInServiceXceiverAverage();
+    double inServiceXceiverCount = getInServiceXceiverAverage(node);
+    final double maxLoad = considerLoadFactor * inServiceXceiverCount;
+
     final int nodeLoad = node.getXceiverCount();
     if ((nodeLoad > maxLoad) && (maxLoad > 0)) {
       logNodeIsNotChosen(node, NodeNotChosenReason.NODE_TOO_BUSY,
@@ -988,6 +995,48 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
   }
 
   /**
+   * Gets the inServiceXceiver average count for the cluster, if
+   * considerLoadByStorageType is true, then load is calculated only for the
+   * storage types present on the datanode.
+   * @param node the datanode whose storage types are to be taken into account.
+   * @return the InServiceXceiverAverage count.
+   */
+  private double getInServiceXceiverAverage(DatanodeDescriptor node) {
+    double inServiceXceiverCount;
+    if (considerLoadByStorageType) {
+      inServiceXceiverCount =
+          getInServiceXceiverAverageByStorageType(node.getStorageTypes());
+    } else {
+      inServiceXceiverCount = stats.getInServiceXceiverAverage();
+    }
+    return inServiceXceiverCount;
+  }
+
+  /**
+   * Gets the average xceiver count with respect to the storage types.
+   * @param storageTypes the storage types.
+   * @return the average xceiver count wrt the provided storage types.
+   */
+  private double getInServiceXceiverAverageByStorageType(
+      Set<StorageType> storageTypes) {
+    double avgLoad = 0;
+    final Map<StorageType, StorageTypeStats> storageStats =
+        stats.getStorageTypeStats();
+    int numNodes = 0;
+    int numXceiver = 0;
+    for (StorageType s : storageTypes) {
+      StorageTypeStats storageTypeStats = storageStats.get(s);
+      numNodes += storageTypeStats.getNodesInService();
+      numXceiver += storageTypeStats.getNodesInServiceXceiverCount();
+    }
+    if (numNodes != 0) {
+      avgLoad = (double) numXceiver / numNodes;
+    }
+
+    return avgLoad;
+  }
+
+  /**
    * Determine if a datanode is good for placing block.
    *
    * @param node The target datanode
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
index 22750ec..01dfe04 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTest
 import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
 import org.apache.hadoop.thirdparty.com.google.common.net.InetAddresses;
 
+import org.apache.hadoop.fs.StorageType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.HadoopIllegalArgumentException;
@@ -1978,6 +1979,11 @@ public class DatanodeManager {
         }
         return avgLoad;
       }
+
+      @Override
+      public Map<StorageType, StorageTypeStats> getStorageTypeStats() {
+        return heartbeatManager.getStorageTypeStats();
+      }
     };
   }
 
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/FSClusterStats.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/FSClusterStats.java
index 556b7fc..1412295 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/FSClusterStats.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/FSClusterStats.java
@@ -18,6 +18,9 @@
 package org.apache.hadoop.hdfs.server.blockmanagement;
 
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.fs.StorageType;
+
+import java.util.Map;
 
 /**
  * This interface is used for retrieving the load related statistics of
@@ -57,4 +60,10 @@ public interface FSClusterStats {
    *         writes that are currently occurring on the cluster.
    */
   public double getInServiceXceiverAverage();
+
+  /**
+   * Indicates the storage statistics per storage type.
+   * @return storage statistics per storage type.
+   */
+  Map<StorageType, StorageTypeStats> getStorageTypeStats();
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/StorageTypeStats.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/StorageTypeStats.java
index c335ec6..f90dbad 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/StorageTypeStats.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/StorageTypeStats.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.blockmanagement;
 
 import java.beans.ConstructorProperties;
 
+import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.fs.StorageType;
@@ -39,6 +40,15 @@ public class StorageTypeStats {
   private int nodesInService = 0;
   private StorageType storageType;
 
+  @VisibleForTesting
+  void setDataNodesInServiceXceiverCount(int avgXceiverPerDatanode,
+      int numNodesInService) {
+    this.nodesInService = numNodesInService;
+    this.nodesInServiceXceiverCount = numNodesInService * avgXceiverPerDatanode;
+  }
+
+  private int nodesInServiceXceiverCount;
+
   @ConstructorProperties({"capacityTotal", "capacityUsed", "capacityNonDfsUsed",
       "capacityRemaining", "blockPoolUsed", "nodesInService"})
   public StorageTypeStats(
@@ -101,6 +111,10 @@ public class StorageTypeStats {
     return nodesInService;
   }
 
+  public int getNodesInServiceXceiverCount() {
+    return nodesInServiceXceiverCount;
+  }
+
   StorageTypeStats(StorageType storageType) {
     this.storageType = storageType;
   }
@@ -131,6 +145,7 @@ public class StorageTypeStats {
   void addNode(final DatanodeDescriptor node) {
     if (node.isInService()) {
       nodesInService++;
+      nodesInServiceXceiverCount += node.getXceiverCount();
     }
   }
 
@@ -151,6 +166,7 @@ public class StorageTypeStats {
   void subtractNode(final DatanodeDescriptor node) {
     if (node.isInService()) {
       nodesInService--;
+      nodesInServiceXceiverCount -= node.getXceiverCount();
     }
   }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index d2cb3c7..e63303f 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -314,6 +314,17 @@
 </property>
 
   <property>
+    <name>dfs.namenode.redundancy.considerLoadByStorageType</name>
+    <value>false</value>
+    <description>
+      Decide if chooseTarget considers the target's load with respect to the
+      storage type. Typically to be used when datanodes contain homogenous
+      storage types. Irrelevent if dfs.namenode.redundancy.considerLoad is
+      false.
+    </description>
+  </property>
+
+  <property>
     <name>dfs.namenode.redundancy.considerLoad.factor</name>
     <value>2.0</value>
     <description>The factor by which a node's load can exceed the average
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockStatsMXBean.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockStatsMXBean.java
index 81549a6..692f8c7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockStatsMXBean.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockStatsMXBean.java
@@ -33,13 +33,18 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Rule;
@@ -218,4 +223,51 @@ public class TestBlockStatsMXBean {
     storageTypeStats = storageTypeStatsMap.get(StorageType.ARCHIVE);
     assertEquals(3, storageTypeStats.getNodesInService());
   }
+
+  @Test
+  public void testStorageTypeLoad() throws Exception {
+    HeartbeatManager heartbeatManager =
+        cluster.getNamesystem().getBlockManager().getDatanodeManager()
+            .getHeartbeatManager();
+    Map<StorageType, StorageTypeStats> storageTypeStatsMap =
+        heartbeatManager.getStorageTypeStats();
+    DistributedFileSystem dfs = cluster.getFileSystem();
+
+    // Create a file with HOT storage policy.
+    Path hotSpDir = new Path("/HOT");
+    dfs.mkdir(hotSpDir, FsPermission.getDirDefault());
+    dfs.setStoragePolicy(hotSpDir, "HOT");
+    FSDataOutputStream hotSpFileStream =
+        dfs.create(new Path(hotSpDir, "hotFile"));
+    hotSpFileStream.write("Storage Policy Hot".getBytes());
+    hotSpFileStream.hflush();
+
+    // Create a file with COLD storage policy.
+    Path coldSpDir = new Path("/COLD");
+    dfs.mkdir(coldSpDir, FsPermission.getDirDefault());
+    dfs.setStoragePolicy(coldSpDir, "COLD");
+    FSDataOutputStream coldSpFileStream =
+        dfs.create(new Path(coldSpDir, "coldFile"));
+    coldSpFileStream.write("Writing to ARCHIVE storage type".getBytes());
+    coldSpFileStream.hflush();
+
+    // Trigger heartbeats manually to speed up the test.
+    cluster.triggerHeartbeats();
+
+    // The load would be 2*replication since both the
+    // write xceiver & packet responder threads are counted.
+    GenericTestUtils.waitFor(() -> storageTypeStatsMap.get(StorageType.DISK)
+        .getNodesInServiceXceiverCount() == 6, 100, 5000);
+
+    // The count for ARCHIVE should be independent of the value of DISK.
+    GenericTestUtils.waitFor(() -> storageTypeStatsMap.get(StorageType.ARCHIVE)
+        .getNodesInServiceXceiverCount() == 6, 100, 5000);
+
+    // The total count should stay unaffected, that is sum of load from all
+    // datanodes.
+    GenericTestUtils
+        .waitFor(() -> heartbeatManager.getInServiceXceiverCount() == 12, 100,
+            5000);
+    IOUtils.closeStreams(hotSpFileStream, coldSpFileStream);
+  }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java
index 78629fe..cf5f5a1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hdfs.server.blockmanagement;
 
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOADBYSTORAGETYPE_KEY;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
@@ -1618,5 +1619,33 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest {
     when(node.getXceiverCount()).thenReturn(10);
     assertTrue(bppd.excludeNodeByLoad(node));
 
+    // Enable load check per storage type.
+    conf.setBoolean(DFS_NAMENODE_REDUNDANCY_CONSIDERLOADBYSTORAGETYPE_KEY,
+        true);
+    bppd.initialize(conf, statistics, null, null);
+    Map<StorageType, StorageTypeStats> storageStats = new HashMap<>();
+    StorageTypeStats diskStorageTypeStats =
+        new StorageTypeStats(StorageType.DISK);
+
+    // Set xceiver count as 500 for DISK.
+    diskStorageTypeStats.setDataNodesInServiceXceiverCount(50, 10);
+    storageStats.put(StorageType.DISK, diskStorageTypeStats);
+
+    //Set xceiver count as 900 for ARCHIVE
+    StorageTypeStats archiveStorageTypeStats =
+        new StorageTypeStats(StorageType.ARCHIVE);
+    archiveStorageTypeStats.setDataNodesInServiceXceiverCount(10, 90);
+    storageStats.put(StorageType.ARCHIVE, diskStorageTypeStats);
+
+    when(statistics.getStorageTypeStats()).thenReturn(storageStats);
+    when(node.getXceiverCount()).thenReturn(29);
+    when(node.getStorageTypes()).thenReturn(EnumSet.of(StorageType.DISK));
+    when(statistics.getInServiceXceiverAverage()).thenReturn(0.0);
+    //Added for sanity, the number of datanodes are 100, the average xceiver
+    // shall be (50*100+90*100)/100 = 14
+    when(statistics.getInServiceXceiverAverage()).thenReturn(14.0);
+    when(node.getXceiverCount()).thenReturn(100);
+
+    assertFalse(bppd.excludeNodeByLoad(node));
   }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org