You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by we...@apache.org on 2021/03/31 03:26:22 UTC
[hadoop] 01/07: HDFS-14383. Compute datanode load based on
StoragePolicy. Contributed by Ayush Saxena.
This is an automated email from the ASF dual-hosted git repository.
weichiu pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git
commit 7f20fad41912217c03cdd7f2d54c01e40f49d051
Author: Ayush Saxena <ay...@apache.org>
AuthorDate: Mon Oct 19 10:48:47 2020 +0530
HDFS-14383. Compute datanode load based on StoragePolicy. Contributed by Ayush Saxena.
(cherry picked from commit 2e8cafac3b071fe5b943542827fd8a496b137fa9)
---
.../java/org/apache/hadoop/hdfs/DFSConfigKeys.java | 5 ++
.../BlockPlacementPolicyDefault.java | 55 ++++++++++++++++++++--
.../server/blockmanagement/DatanodeManager.java | 6 +++
.../server/blockmanagement/FSClusterStats.java | 9 ++++
.../server/blockmanagement/StorageTypeStats.java | 16 +++++++
.../src/main/resources/hdfs-default.xml | 11 +++++
.../blockmanagement/TestBlockStatsMXBean.java | 52 ++++++++++++++++++++
.../blockmanagement/TestReplicationPolicy.java | 29 ++++++++++++
8 files changed, 180 insertions(+), 3 deletions(-)
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index f7da4cb..75b7389 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -239,6 +239,11 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
HdfsClientConfigKeys.DeprecatedKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOAD_KEY;
public static final boolean DFS_NAMENODE_REDUNDANCY_CONSIDERLOAD_DEFAULT =
true;
+ public static final String
+ DFS_NAMENODE_REDUNDANCY_CONSIDERLOADBYSTORAGETYPE_KEY =
+ "dfs.namenode.redundancy.considerLoadByStorageType";
+ public static final boolean
+ DFS_NAMENODE_REDUNDANCY_CONSIDERLOADBYSTORAGETYPE_DEFAULT = false;
public static final String DFS_NAMENODE_READ_CONSIDERLOAD_KEY =
"dfs.namenode.read.considerLoad";
public static final boolean DFS_NAMENODE_READ_CONSIDERLOAD_DEFAULT =
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
index a1a83b0..5761690 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
@@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOADBYSTORAGETYPE_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOADBYSTORAGETYPE_KEY;
import static org.apache.hadoop.util.Time.monotonicNow;
import java.util.*;
@@ -92,7 +94,8 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
}
}
- protected boolean considerLoad;
+ protected boolean considerLoad;
+ private boolean considerLoadByStorageType;
protected double considerLoadFactor;
private boolean preferLocalNode;
protected NetworkTopology clusterMap;
@@ -116,6 +119,9 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
this.considerLoad = conf.getBoolean(
DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOAD_KEY,
DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOAD_DEFAULT);
+ this.considerLoadByStorageType = conf.getBoolean(
+ DFS_NAMENODE_REDUNDANCY_CONSIDERLOADBYSTORAGETYPE_KEY,
+ DFS_NAMENODE_REDUNDANCY_CONSIDERLOADBYSTORAGETYPE_DEFAULT);
this.considerLoadFactor = conf.getDouble(
DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOAD_FACTOR,
DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOAD_FACTOR_DEFAULT);
@@ -976,8 +982,9 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
* @return Return true if the datanode should be excluded, otherwise false
*/
boolean excludeNodeByLoad(DatanodeDescriptor node){
- final double maxLoad = considerLoadFactor *
- stats.getInServiceXceiverAverage();
+ double inServiceXceiverCount = getInServiceXceiverAverage(node);
+ final double maxLoad = considerLoadFactor * inServiceXceiverCount;
+
final int nodeLoad = node.getXceiverCount();
if ((nodeLoad > maxLoad) && (maxLoad > 0)) {
logNodeIsNotChosen(node, NodeNotChosenReason.NODE_TOO_BUSY,
@@ -988,6 +995,48 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
}
/**
+ * Gets the inServiceXceiver average count for the cluster, if
+ * considerLoadByStorageType is true, then load is calculated only for the
+ * storage types present on the datanode.
+ * @param node the datanode whose storage types are to be taken into account.
+ * @return the InServiceXceiverAverage count.
+ */
+ private double getInServiceXceiverAverage(DatanodeDescriptor node) {
+ double inServiceXceiverCount;
+ if (considerLoadByStorageType) {
+ inServiceXceiverCount =
+ getInServiceXceiverAverageByStorageType(node.getStorageTypes());
+ } else {
+ inServiceXceiverCount = stats.getInServiceXceiverAverage();
+ }
+ return inServiceXceiverCount;
+ }
+
+ /**
+ * Gets the average xceiver count with respect to the storage types.
+ * @param storageTypes the storage types.
+ * @return the average xceiver count wrt the provided storage types.
+ */
+ private double getInServiceXceiverAverageByStorageType(
+ Set<StorageType> storageTypes) {
+ double avgLoad = 0;
+ final Map<StorageType, StorageTypeStats> storageStats =
+ stats.getStorageTypeStats();
+ int numNodes = 0;
+ int numXceiver = 0;
+ for (StorageType s : storageTypes) {
+ StorageTypeStats storageTypeStats = storageStats.get(s);
+ numNodes += storageTypeStats.getNodesInService();
+ numXceiver += storageTypeStats.getNodesInServiceXceiverCount();
+ }
+ if (numNodes != 0) {
+ avgLoad = (double) numXceiver / numNodes;
+ }
+
+ return avgLoad;
+ }
+
+ /**
* Determine if a datanode is good for placing block.
*
* @param node The target datanode
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
index 22750ec..01dfe04 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTest
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
import org.apache.hadoop.thirdparty.com.google.common.net.InetAddresses;
+import org.apache.hadoop.fs.StorageType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.HadoopIllegalArgumentException;
@@ -1978,6 +1979,11 @@ public class DatanodeManager {
}
return avgLoad;
}
+
+ @Override
+ public Map<StorageType, StorageTypeStats> getStorageTypeStats() {
+ return heartbeatManager.getStorageTypeStats();
+ }
};
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/FSClusterStats.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/FSClusterStats.java
index 556b7fc..1412295 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/FSClusterStats.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/FSClusterStats.java
@@ -18,6 +18,9 @@
package org.apache.hadoop.hdfs.server.blockmanagement;
import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.fs.StorageType;
+
+import java.util.Map;
/**
* This interface is used for retrieving the load related statistics of
@@ -57,4 +60,10 @@ public interface FSClusterStats {
* writes that are currently occurring on the cluster.
*/
public double getInServiceXceiverAverage();
+
+ /**
+ * Indicates the storage statistics per storage type.
+ * @return storage statistics per storage type.
+ */
+ Map<StorageType, StorageTypeStats> getStorageTypeStats();
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/StorageTypeStats.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/StorageTypeStats.java
index c335ec6..f90dbad 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/StorageTypeStats.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/StorageTypeStats.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.blockmanagement;
import java.beans.ConstructorProperties;
+import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.StorageType;
@@ -39,6 +40,15 @@ public class StorageTypeStats {
private int nodesInService = 0;
private StorageType storageType;
+ @VisibleForTesting
+ void setDataNodesInServiceXceiverCount(int avgXceiverPerDatanode,
+ int numNodesInService) {
+ this.nodesInService = numNodesInService;
+ this.nodesInServiceXceiverCount = numNodesInService * avgXceiverPerDatanode;
+ }
+
+ private int nodesInServiceXceiverCount;
+
@ConstructorProperties({"capacityTotal", "capacityUsed", "capacityNonDfsUsed",
"capacityRemaining", "blockPoolUsed", "nodesInService"})
public StorageTypeStats(
@@ -101,6 +111,10 @@ public class StorageTypeStats {
return nodesInService;
}
+ public int getNodesInServiceXceiverCount() {
+ return nodesInServiceXceiverCount;
+ }
+
StorageTypeStats(StorageType storageType) {
this.storageType = storageType;
}
@@ -131,6 +145,7 @@ public class StorageTypeStats {
void addNode(final DatanodeDescriptor node) {
if (node.isInService()) {
nodesInService++;
+ nodesInServiceXceiverCount += node.getXceiverCount();
}
}
@@ -151,6 +166,7 @@ public class StorageTypeStats {
void subtractNode(final DatanodeDescriptor node) {
if (node.isInService()) {
nodesInService--;
+ nodesInServiceXceiverCount -= node.getXceiverCount();
}
}
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index d2cb3c7..e63303f 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -314,6 +314,17 @@
</property>
<property>
+ <name>dfs.namenode.redundancy.considerLoadByStorageType</name>
+ <value>false</value>
+ <description>
+ Decide if chooseTarget considers the target's load with respect to the
+ storage type. Typically to be used when datanodes contain homogenous
+ storage types. Irrelevent if dfs.namenode.redundancy.considerLoad is
+ false.
+ </description>
+ </property>
+
+ <property>
<name>dfs.namenode.redundancy.considerLoad.factor</name>
<value>2.0</value>
<description>The factor by which a node's load can exceed the average
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockStatsMXBean.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockStatsMXBean.java
index 81549a6..692f8c7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockStatsMXBean.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockStatsMXBean.java
@@ -33,13 +33,18 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.test.GenericTestUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
@@ -218,4 +223,51 @@ public class TestBlockStatsMXBean {
storageTypeStats = storageTypeStatsMap.get(StorageType.ARCHIVE);
assertEquals(3, storageTypeStats.getNodesInService());
}
+
+ @Test
+ public void testStorageTypeLoad() throws Exception {
+ HeartbeatManager heartbeatManager =
+ cluster.getNamesystem().getBlockManager().getDatanodeManager()
+ .getHeartbeatManager();
+ Map<StorageType, StorageTypeStats> storageTypeStatsMap =
+ heartbeatManager.getStorageTypeStats();
+ DistributedFileSystem dfs = cluster.getFileSystem();
+
+ // Create a file with HOT storage policy.
+ Path hotSpDir = new Path("/HOT");
+ dfs.mkdir(hotSpDir, FsPermission.getDirDefault());
+ dfs.setStoragePolicy(hotSpDir, "HOT");
+ FSDataOutputStream hotSpFileStream =
+ dfs.create(new Path(hotSpDir, "hotFile"));
+ hotSpFileStream.write("Storage Policy Hot".getBytes());
+ hotSpFileStream.hflush();
+
+ // Create a file with COLD storage policy.
+ Path coldSpDir = new Path("/COLD");
+ dfs.mkdir(coldSpDir, FsPermission.getDirDefault());
+ dfs.setStoragePolicy(coldSpDir, "COLD");
+ FSDataOutputStream coldSpFileStream =
+ dfs.create(new Path(coldSpDir, "coldFile"));
+ coldSpFileStream.write("Writing to ARCHIVE storage type".getBytes());
+ coldSpFileStream.hflush();
+
+ // Trigger heartbeats manually to speed up the test.
+ cluster.triggerHeartbeats();
+
+ // The load would be 2*replication since both the
+ // write xceiver & packet responder threads are counted.
+ GenericTestUtils.waitFor(() -> storageTypeStatsMap.get(StorageType.DISK)
+ .getNodesInServiceXceiverCount() == 6, 100, 5000);
+
+ // The count for ARCHIVE should be independent of the value of DISK.
+ GenericTestUtils.waitFor(() -> storageTypeStatsMap.get(StorageType.ARCHIVE)
+ .getNodesInServiceXceiverCount() == 6, 100, 5000);
+
+ // The total count should stay unaffected, that is sum of load from all
+ // datanodes.
+ GenericTestUtils
+ .waitFor(() -> heartbeatManager.getInServiceXceiverCount() == 12, 100,
+ 5000);
+ IOUtils.closeStreams(hotSpFileStream, coldSpFileStream);
+ }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java
index 78629fe..cf5f5a1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOADBYSTORAGETYPE_KEY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
@@ -1618,5 +1619,33 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest {
when(node.getXceiverCount()).thenReturn(10);
assertTrue(bppd.excludeNodeByLoad(node));
+ // Enable load check per storage type.
+ conf.setBoolean(DFS_NAMENODE_REDUNDANCY_CONSIDERLOADBYSTORAGETYPE_KEY,
+ true);
+ bppd.initialize(conf, statistics, null, null);
+ Map<StorageType, StorageTypeStats> storageStats = new HashMap<>();
+ StorageTypeStats diskStorageTypeStats =
+ new StorageTypeStats(StorageType.DISK);
+
+ // Set xceiver count as 500 for DISK.
+ diskStorageTypeStats.setDataNodesInServiceXceiverCount(50, 10);
+ storageStats.put(StorageType.DISK, diskStorageTypeStats);
+
+ //Set xceiver count as 900 for ARCHIVE
+ StorageTypeStats archiveStorageTypeStats =
+ new StorageTypeStats(StorageType.ARCHIVE);
+ archiveStorageTypeStats.setDataNodesInServiceXceiverCount(10, 90);
+ storageStats.put(StorageType.ARCHIVE, diskStorageTypeStats);
+
+ when(statistics.getStorageTypeStats()).thenReturn(storageStats);
+ when(node.getXceiverCount()).thenReturn(29);
+ when(node.getStorageTypes()).thenReturn(EnumSet.of(StorageType.DISK));
+ when(statistics.getInServiceXceiverAverage()).thenReturn(0.0);
+ //Added for sanity, the number of datanodes are 100, the average xceiver
+ // shall be (50*100+90*100)/100 = 14
+ when(statistics.getInServiceXceiverAverage()).thenReturn(14.0);
+ when(node.getXceiverCount()).thenReturn(100);
+
+ assertFalse(bppd.excludeNodeByLoad(node));
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org