You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by we...@apache.org on 2019/11/15 20:17:39 UTC
[hadoop] branch trunk updated: HDFS-14882. Consider DataNode load
when #getBlockLocation. Contributed by Xiaoqiao He.
This is an automated email from the ASF dual-hosted git repository.
weichiu pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new c892a87 HDFS-14882. Consider DataNode load when #getBlockLocation. Contributed by Xiaoqiao He.
c892a87 is described below
commit c892a879ddce3abfd51c8609c81148bf6e4f9daa
Author: He Xiaoqiao <he...@apache.org>
AuthorDate: Fri Nov 15 12:15:33 2019 -0800
HDFS-14882. Consider DataNode load when #getBlockLocation. Contributed by Xiaoqiao He.
Signed-off-by: Wei-Chiu Chuang <we...@apache.org>
Reviewed-by: Inigo Goiri <in...@apache.org>
Reviewed-by: Istvan Fajth <pi...@cloudera.com>
---
.../org/apache/hadoop/net/NetworkTopology.java | 68 ++++++++++++++----
.../java/org/apache/hadoop/hdfs/DFSConfigKeys.java | 4 ++
.../server/blockmanagement/DatanodeManager.java | 24 ++++++-
.../src/main/resources/hdfs-default.xml | 13 +++-
.../blockmanagement/TestDatanodeManager.java | 82 ++++++++++++++++++++++
.../org/apache/hadoop/net/TestNetworkTopology.java | 34 ++++++---
6 files changed, 199 insertions(+), 26 deletions(-)
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java
index 724cec3..66799f5 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java
@@ -31,6 +31,7 @@ import org.slf4j.LoggerFactory;
import java.util.*;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Consumer;
/** The class represents a cluster of computer with a tree hierarchical
* network topology.
@@ -874,11 +875,33 @@ public class NetworkTopology {
* This method is called if the reader is a datanode,
* so nonDataNodeReader flag is set to false.
*/
- sortByDistance(reader, nodes, activeLen, false);
+ sortByDistance(reader, nodes, activeLen, list -> Collections.shuffle(list));
}
/**
- * Sort nodes array by network distance to <i>reader</i>.
+ * Sort nodes array by network distance to <i>reader</i> with secondary sort.
+ * <p>
+ * In a three-level topology, a node can be either local, on the same rack,
+ * or on a different rack from the reader. Sorting the nodes based on network
+ * distance from the reader reduces network traffic and improves
+ * performance.
+ * <p>
+ * As an additional twist, we also randomize the nodes at each network
+ * distance. This helps with load balancing when there is data skew.
+ *
+ * @param reader Node where data will be read
+ * @param nodes Available replicas with the requested data
+ * @param activeLen Number of active nodes at the front of the array
+ * @param secondarySort a secondary sorting strategy which can inject into
+ * that point from outside to help sort the same distance.
+ */
+ public <T extends Node> void sortByDistance(Node reader, T[] nodes,
+ int activeLen, Consumer<List<T>> secondarySort){
+ sortByDistance(reader, nodes, activeLen, secondarySort, false);
+ }
+
+ /**
+ * Sort nodes array by network distance to <i>reader</i> with secondary sort.
* <p> using network location. This is used when the reader
* is not a datanode. Sorting the nodes based on network distance
* from the reader reduces network traffic and improves
@@ -895,7 +918,27 @@ public class NetworkTopology {
* This method is called if the reader is not a datanode,
* so nonDataNodeReader flag is set to true.
*/
- sortByDistance(reader, nodes, activeLen, true);
+ sortByDistanceUsingNetworkLocation(reader, nodes, activeLen,
+ list -> Collections.shuffle(list));
+ }
+
+ /**
+ * Sort nodes array by network distance to <i>reader</i>.
+ * <p> using network location. This is used when the reader
+ * is not a datanode. Sorting the nodes based on network distance
+ * from the reader reduces network traffic and improves
+ * performance.
+ * <p>
+ *
+ * @param reader Node where data will be read
+ * @param nodes Available replicas with the requested data
+ * @param activeLen Number of active nodes at the front of the array
+ * @param secondarySort a secondary sorting strategy which can inject into
+ * that point from outside to help sort the same distance.
+ */
+ public <T extends Node> void sortByDistanceUsingNetworkLocation(Node reader,
+ T[] nodes, int activeLen, Consumer<List<T>> secondarySort) {
+ sortByDistance(reader, nodes, activeLen, secondarySort, true);
}
/**
@@ -909,7 +952,8 @@ public class NetworkTopology {
* @param activeLen Number of active nodes at the front of the array
* @param nonDataNodeReader True if the reader is not a datanode
*/
- private void sortByDistance(Node reader, Node[] nodes, int activeLen,
+ private <T extends Node> void sortByDistance(Node reader, T[] nodes,
+ int activeLen, Consumer<List<T>> secondarySort,
boolean nonDataNodeReader) {
/** Sort weights for the nodes array */
int[] weights = new int[activeLen];
@@ -921,23 +965,23 @@ public class NetworkTopology {
}
}
// Add weight/node pairs to a TreeMap to sort
- TreeMap<Integer, List<Node>> tree = new TreeMap<Integer, List<Node>>();
+ TreeMap<Integer, List<T>> tree = new TreeMap<>();
for (int i=0; i<activeLen; i++) {
int weight = weights[i];
- Node node = nodes[i];
- List<Node> list = tree.get(weight);
+ T node = nodes[i];
+ List<T> list = tree.get(weight);
if (list == null) {
list = Lists.newArrayListWithExpectedSize(1);
tree.put(weight, list);
}
list.add(node);
}
-
+ // Sort nodes which have the same weight using secondarySort.
int idx = 0;
- for (List<Node> list: tree.values()) {
+ for (List<T> list: tree.values()) {
if (list != null) {
- Collections.shuffle(list, r);
- for (Node n: list) {
+ secondarySort.accept(list);
+ for (T n: list) {
nodes[idx] = n;
idx++;
}
@@ -946,4 +990,4 @@ public class NetworkTopology {
Preconditions.checkState(idx == activeLen,
"Sorted the wrong number of nodes!");
}
-}
+}
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index a2df317..1c3a71f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -230,6 +230,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
HdfsClientConfigKeys.DeprecatedKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOAD_KEY;
public static final boolean DFS_NAMENODE_REDUNDANCY_CONSIDERLOAD_DEFAULT =
true;
+ public static final String DFS_NAMENODE_READ_CONSIDERLOAD_KEY =
+ "dfs.namenode.read.considerLoad";
+ public static final boolean DFS_NAMENODE_READ_CONSIDERLOAD_DEFAULT =
+ false;
public static final String DFS_NAMENODE_REDUNDANCY_CONSIDERLOAD_FACTOR =
"dfs.namenode.redundancy.considerLoad.factor";
public static final double
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
index 049c949..8adb03d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
@@ -30,7 +30,6 @@ import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
@@ -66,6 +65,7 @@ import java.net.UnknownHostException;
import java.util.*;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
/**
* Manage datanodes, include decommission and other activities.
@@ -134,6 +134,9 @@ public class DatanodeManager {
/** Whether or not to avoid using stale DataNodes for reading */
private final boolean avoidStaleDataNodesForRead;
+ /** Whether or not to consider lad for reading. */
+ private final boolean readConsiderLoad;
+
/**
* Whether or not to avoid using stale DataNodes for writing.
* Note that, even if this is configured, the policy may be
@@ -314,6 +317,9 @@ public class DatanodeManager {
this.avoidStaleDataNodesForRead = conf.getBoolean(
DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_KEY,
DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_DEFAULT);
+ this.readConsiderLoad = conf.getBoolean(
+ DFSConfigKeys.DFS_NAMENODE_READ_CONSIDERLOAD_KEY,
+ DFSConfigKeys.DFS_NAMENODE_READ_CONSIDERLOAD_DEFAULT);
this.avoidStaleDataNodesForWrite = conf.getBoolean(
DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY,
DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_DEFAULT);
@@ -530,9 +536,10 @@ public class DatanodeManager {
int activeLen = lastActiveIndex + 1;
if(nonDatanodeReader) {
networktopology.sortByDistanceUsingNetworkLocation(client,
- lb.getLocations(), activeLen);
+ lb.getLocations(), activeLen, createSecondaryNodeSorter());
} else {
- networktopology.sortByDistance(client, lb.getLocations(), activeLen);
+ networktopology.sortByDistance(client, lb.getLocations(), activeLen,
+ createSecondaryNodeSorter());
}
// move PROVIDED storage to the end to prefer local replicas.
lb.moveProvidedToEnd(activeLen);
@@ -540,6 +547,17 @@ public class DatanodeManager {
lb.updateCachedStorageInfo();
}
+ private Consumer<List<DatanodeInfo>> createSecondaryNodeSorter() {
+ Consumer<List<DatanodeInfo>> secondarySort =
+ list -> Collections.shuffle(list);
+ if (readConsiderLoad) {
+ Comparator<DatanodeInfo> comp =
+ Comparator.comparingInt(DatanodeInfo::getXceiverCount);
+ secondarySort = list -> Collections.sort(list, comp);
+ }
+ return secondarySort;
+ }
+
/** @return the datanode descriptor for the host. */
public DatanodeDescriptor getDatanodeByHost(final String host) {
return host2DatanodeMap.getDatanodeByHost(host);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 98b91a6..7effbd0 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -307,7 +307,9 @@
<property>
<name>dfs.namenode.redundancy.considerLoad</name>
<value>true</value>
- <description>Decide if chooseTarget considers the target's load or not
+ <description>
+ Decide if chooseTarget considers the target's load or not when write.
+ Turn on by default.
</description>
</property>
@@ -320,6 +322,15 @@
</property>
<property>
+ <name>dfs.namenode.read.considerLoad</name>
+ <value>false</value>
+ <description>
+ Decide if sort block locations considers the target's load or not when read.
+ Turn off by default.
+ </description>
+</property>
+
+<property>
<name>dfs.datanode.httpserver.filter.handlers</name>
<value>org.apache.hadoop.hdfs.server.datanode.web.RestCsrfPreventionFilterHandler</value>
<description>Comma separated list of Netty servlet-style filter handlers to inject into the Datanode WebHDFS I/O path
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java
index 210e434..e8e6b94 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java
@@ -518,6 +518,88 @@ public class TestDatanodeManager {
assertEquals(locs[4].getIpAddr(), sortedLocs2[0].getIpAddr());
}
+ @Test
+ public void testGetBlockLocationConsiderLoad()
+ throws IOException, URISyntaxException {
+ Configuration conf = new Configuration();
+ conf.setBoolean(
+ DFSConfigKeys.DFS_NAMENODE_READ_CONSIDERLOAD_KEY, true);
+ conf.setBoolean(
+ DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_KEY, true);
+ FSNamesystem fsn = Mockito.mock(FSNamesystem.class);
+ Mockito.when(fsn.hasWriteLock()).thenReturn(true);
+ URL shellScript = getClass().getResource(
+ "/" + Shell.appendScriptExtension("topology-script"));
+ Path resourcePath = Paths.get(shellScript.toURI());
+ FileUtil.setExecutable(resourcePath.toFile(), true);
+ conf.set(DFSConfigKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY,
+ resourcePath.toString());
+ DatanodeManager dm = mockDatanodeManager(fsn, conf);
+
+ int totalDNs = 5;
+ // Register 5 datanodes and 2 nodes per rack with different load.
+ DatanodeInfo[] locs = new DatanodeInfo[totalDNs];
+ String[] storageIDs = new String[totalDNs];
+ for (int i = 0; i < totalDNs; i++) {
+ // Register new datanode.
+ String uuid = "UUID-" + i;
+ String ip = "IP-" + i / 2 + "-" + i;
+ DatanodeRegistration dr = Mockito.mock(DatanodeRegistration.class);
+ Mockito.when(dr.getDatanodeUuid()).thenReturn(uuid);
+ Mockito.when(dr.getIpAddr()).thenReturn(ip);
+ dm.registerDatanode(dr);
+
+ // Get location and storage information.
+ locs[i] = dm.getDatanode(uuid);
+ storageIDs[i] = "storageID-" + i;
+
+ // Set load for datanodes.
+ locs[i].setXceiverCount(i);
+ }
+
+ // Set node 0 decommissioned.
+ locs[0].setDecommissioned();
+
+ // Create LocatedBlock with above locations.
+ ExtendedBlock b = new ExtendedBlock("somePoolID", 1234);
+ LocatedBlock block = new LocatedBlock(b, locs);
+ List<LocatedBlock> blocks = new ArrayList<>();
+ blocks.add(block);
+
+ // Test client located at locs[3] in cluster.
+ final String targetIpInCluster = locs[3].getIpAddr();
+ dm.sortLocatedBlocks(targetIpInCluster, blocks);
+ DatanodeInfo[] sortedLocs = block.getLocations();
+ assertEquals(totalDNs, sortedLocs.length);
+ // Ensure the local node is first.
+ assertEquals(targetIpInCluster, sortedLocs[0].getIpAddr());
+ // Ensure the lightweight node is more close when distance is same.
+ assertEquals(locs[3].getIpAddr(), sortedLocs[0].getIpAddr());
+ assertEquals(locs[2].getIpAddr(), sortedLocs[1].getIpAddr());
+ assertEquals(locs[1].getIpAddr(), sortedLocs[2].getIpAddr());
+ assertEquals(locs[4].getIpAddr(), sortedLocs[3].getIpAddr());
+ // Ensure the two decommissioned DNs were moved to the end.
+ assertThat(sortedLocs[4].getAdminState(),
+ is(DatanodeInfo.AdminStates.DECOMMISSIONED));
+ assertEquals(locs[0].getIpAddr(), sortedLocs[4].getIpAddr());
+
+ // Test client not in cluster but same rack with locs[3].
+ final String targetIpNotInCluster = locs[3].getIpAddr() + "-client";
+ dm.sortLocatedBlocks(targetIpNotInCluster, blocks);
+ DatanodeInfo[] sortedLocs2 = block.getLocations();
+ assertEquals(totalDNs, sortedLocs2.length);
+ // Ensure the local rack is first and lightweight node is first
+ // when distance is same.
+ assertEquals(locs[2].getIpAddr(), sortedLocs2[0].getIpAddr());
+ assertEquals(locs[3].getIpAddr(), sortedLocs2[1].getIpAddr());
+ assertEquals(locs[1].getIpAddr(), sortedLocs2[2].getIpAddr());
+ assertEquals(locs[4].getIpAddr(), sortedLocs2[3].getIpAddr());
+ // Ensure the two decommissioned DNs were moved to the end.
+ assertThat(sortedLocs[4].getAdminState(),
+ is(DatanodeInfo.AdminStates.DECOMMISSIONED));
+ assertEquals(locs[0].getIpAddr(), sortedLocs2[4].getIpAddr());
+ }
+
/**
* Test whether removing a host from the includes list without adding it to
* the excludes list will exclude it from data node reports.
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/net/TestNetworkTopology.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/net/TestNetworkTopology.java
index 9466a75..f16bfb7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/net/TestNetworkTopology.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/net/TestNetworkTopology.java
@@ -241,10 +241,16 @@ public class TestNetworkTopology {
cluster.setRandomSeed(0xDEADBEEF);
cluster.sortByDistance(dataNodes[8], dtestNodes, dtestNodes.length - 2);
assertTrue(dtestNodes[0] == dataNodes[8]);
- assertTrue(dtestNodes[1] == dataNodes[11]);
- assertTrue(dtestNodes[2] == dataNodes[12]);
- assertTrue(dtestNodes[3] == dataNodes[9]);
- assertTrue(dtestNodes[4] == dataNodes[10]);
+ assertTrue(dtestNodes[1] != dtestNodes[2]);
+ assertTrue(dtestNodes[1] == dataNodes[11]
+ || dtestNodes[1] == dataNodes[12]);
+ assertTrue(dtestNodes[2] == dataNodes[11]
+ || dtestNodes[2] == dataNodes[12]);
+ assertTrue(dtestNodes[3] != dtestNodes[4]);
+ assertTrue(dtestNodes[3] == dataNodes[9]
+ || dtestNodes[3] == dataNodes[10]);
+ assertTrue(dtestNodes[4] == dataNodes[9]
+ || dtestNodes[4] == dataNodes[10]);
// array contains local node
testNodes[0] = dataNodes[1];
@@ -331,10 +337,14 @@ public class TestNetworkTopology {
testNodes[2] = dataNodes[8];
Node rackClient = new NodeBase("/d3/r1/25.25.25");
cluster.setRandomSeed(0xDEADBEEF);
- cluster.sortByDistance(rackClient, testNodes, testNodes.length);
+ cluster.sortByDistanceUsingNetworkLocation(rackClient, testNodes,
+ testNodes.length);
assertTrue(testNodes[0] == dataNodes[8]);
- assertTrue(testNodes[1] == dataNodes[5]);
- assertTrue(testNodes[2] == dataNodes[0]);
+ assertTrue(testNodes[1] != testNodes[2]);
+ assertTrue(testNodes[1] == dataNodes[0]
+ || testNodes[1] == dataNodes[5]);
+ assertTrue(testNodes[2] == dataNodes[0]
+ || testNodes[2] == dataNodes[5]);
//Reader is not a datanode , but is in one of the datanode's data center.
testNodes[0] = dataNodes[8];
@@ -342,10 +352,14 @@ public class TestNetworkTopology {
testNodes[2] = dataNodes[0];
Node dcClient = new NodeBase("/d1/r2/25.25.25");
cluster.setRandomSeed(0xDEADBEEF);
- cluster.sortByDistance(dcClient, testNodes, testNodes.length);
+ cluster.sortByDistanceUsingNetworkLocation(dcClient, testNodes,
+ testNodes.length);
assertTrue(testNodes[0] == dataNodes[0]);
- assertTrue(testNodes[1] == dataNodes[5]);
- assertTrue(testNodes[2] == dataNodes[8]);
+ assertTrue(testNodes[1] != testNodes[2]);
+ assertTrue(testNodes[1] == dataNodes[5]
+ || testNodes[1] == dataNodes[8]);
+ assertTrue(testNodes[2] == dataNodes[5]
+ || testNodes[2] == dataNodes[8]);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org