You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by xk...@apache.org on 2019/11/15 20:02:12 UTC
[hadoop] branch branch-3.1 updated: HDFS-14973. More strictly
enforce Balancer/Mover/SPS throttling of getBlocks RPCs to NameNodes.
Contributed by Erik Krogen.
This is an automated email from the ASF dual-hosted git repository.
xkrogen pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 17779ad HDFS-14973. More strictly enforce Balancer/Mover/SPS throttling of getBlocks RPCs to NameNodes. Contributed by Erik Krogen.
17779ad is described below
commit 17779adb32569bd689a93802512003b6a6816bd4
Author: Erik Krogen <xk...@apache.org>
AuthorDate: Fri Nov 8 08:57:14 2019 -0800
HDFS-14973. More strictly enforce Balancer/Mover/SPS throttling of getBlocks RPCs to NameNodes. Contributed by Erik Krogen.
(cherry picked from b2cc8b6b4a78f31cdd937dc4d1a2255f80c5881e)
(cherry picked from 60655bfe54e138957ef5bbf480a4541bd83152fd)
---
.../java/org/apache/hadoop/hdfs/DFSConfigKeys.java | 8 +++
.../hadoop/hdfs/server/balancer/Dispatcher.java | 37 +-----------
.../hdfs/server/balancer/NameNodeConnector.java | 20 ++++++-
.../src/main/resources/hdfs-default.xml | 11 +++-
.../hadoop/hdfs/server/balancer/TestBalancer.java | 66 ++++++++++++++--------
.../hdfs/server/balancer/TestBalancerRPCDelay.java | 28 ++++++++-
6 files changed, 108 insertions(+), 62 deletions(-)
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index ce0c6d3..5bae88c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -591,6 +591,14 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
"dfs.datanode.metrics.logger.period.seconds";
public static final int DFS_DATANODE_METRICS_LOGGER_PERIOD_SECONDS_DEFAULT =
600;
+ /**
+ * The maximum number of getBlocks RPCs data movement utilities can make to
+ * a NameNode per second. Values <= 0 disable throttling. This affects
+ * anything that uses a NameNodeConnector, i.e., the Balancer, Mover,
+ * and StoragePolicySatisfier.
+ */
+ public static final String DFS_NAMENODE_GETBLOCKS_MAX_QPS_KEY = "dfs.namenode.get-blocks.max-qps";
+ public static final int DFS_NAMENODE_GETBLOCKS_MAX_QPS_DEFAULT = 20;
public static final String DFS_BALANCER_MOVEDWINWIDTH_KEY = "dfs.balancer.movedWinWidth";
public static final long DFS_BALANCER_MOVEDWINWIDTH_DEFAULT = 5400*1000L;
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
index 060c013..0c60588 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
@@ -927,10 +927,8 @@ public class Dispatcher {
* move tasks or it has received enough blocks from the namenode, or the
* elapsed time of the iteration has exceeded the max time limit.
*
- * @param delay - time to sleep before sending getBlocks. Intended to
- * disperse Balancer RPCs to NameNode for large clusters. See HDFS-11384.
*/
- private void dispatchBlocks(long delay) {
+ private void dispatchBlocks() {
this.blocksToReceive = 2 * getScheduledSize();
long previousMoveTimestamp = Time.monotonicNow();
while (getScheduledSize() > 0 && !isIterationOver()
@@ -955,25 +953,15 @@ public class Dispatcher {
if (shouldFetchMoreBlocks()) {
// fetch new blocks
try {
- if(delay > 0) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Sleeping " + delay + " msec.");
- }
- Thread.sleep(delay);
- }
final long received = getBlockList();
if (received == 0) {
return;
}
blocksToReceive -= received;
continue;
- } catch (InterruptedException ignored) {
- // nothing to do
} catch (IOException e) {
LOG.warn("Exception while getting reportedBlock list", e);
return;
- } finally {
- delay = 0L;
}
} else {
// jump out of while-loop after the configured timeout.
@@ -1165,12 +1153,6 @@ public class Dispatcher {
}
/**
- * The best-effort limit on the number of RPCs per second
- * the Balancer will send to the NameNode.
- */
- final static int BALANCER_NUM_RPC_PER_SEC = 20;
-
- /**
* Dispatch block moves for each source. The thread selects blocks to move &
* sends request to proxy source to initiate block move. The process is flow
* controlled. Block selection is blocked if there are too many un-confirmed
@@ -1185,12 +1167,7 @@ public class Dispatcher {
int concurrentThreads = Math.min(sources.size(),
((ThreadPoolExecutor)dispatchExecutor).getCorePoolSize());
assert concurrentThreads > 0 : "Number of concurrent threads is 0.";
- if (LOG.isDebugEnabled()) {
- LOG.debug("Balancer allowed RPCs per sec = " + BALANCER_NUM_RPC_PER_SEC);
- LOG.debug("Balancer concurrent threads = " + concurrentThreads);
- LOG.debug("Disperse Interval sec = " +
- concurrentThreads / BALANCER_NUM_RPC_PER_SEC);
- }
+ LOG.debug("Balancer concurrent dispatcher threads " + concurrentThreads);
// Determine the size of each mover thread pool per target
int threadsPerTarget = maxMoverThreads/targets.size();
@@ -1210,23 +1187,15 @@ public class Dispatcher {
LOG.info("Allocating " + threadsPerTarget + " threads per target.");
}
- long dSec = 0;
final Iterator<Source> i = sources.iterator();
for (int j = 0; j < futures.length; j++) {
final Source s = i.next();
- final long delay = dSec * 1000;
futures[j] = dispatchExecutor.submit(new Runnable() {
@Override
public void run() {
- s.dispatchBlocks(delay);
+ s.dispatchBlocks();
}
});
- // Calculate delay in seconds for the next iteration
- if(j >= concurrentThreads) {
- dSec = 0;
- } else if((j + 1) % BALANCER_NUM_RPC_PER_SEC == 0) {
- dSec++;
- }
}
// wait for all dispatcher threads to finish
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
index 2819137..e02bad98 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
@@ -31,6 +31,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.RateLimiter;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -40,6 +41,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FsServerDefaults;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StreamCapabilities.StreamCapability;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.NameNodeProxies;
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
@@ -50,7 +52,6 @@ import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
import org.apache.hadoop.hdfs.server.protocol.BalancerProtocols;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
-import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.RemoteException;
@@ -116,6 +117,7 @@ public class NameNodeConnector implements Closeable {
private final int maxNotChangedIterations;
private int notChangedIterations = 0;
+ private final RateLimiter getBlocksRateLimiter;
public NameNodeConnector(String name, URI nameNodeUri, Path idPath,
List<Path> targetPaths, Configuration conf,
@@ -126,6 +128,16 @@ public class NameNodeConnector implements Closeable {
this.targetPaths = targetPaths == null || targetPaths.isEmpty() ? Arrays
.asList(new Path("/")) : targetPaths;
this.maxNotChangedIterations = maxNotChangedIterations;
+ int getBlocksMaxQps = conf.getInt(
+ DFSConfigKeys.DFS_NAMENODE_GETBLOCKS_MAX_QPS_KEY,
+ DFSConfigKeys.DFS_NAMENODE_GETBLOCKS_MAX_QPS_DEFAULT);
+ if (getBlocksMaxQps > 0) {
+ LOG.info("getBlocks calls for " + nameNodeUri
+ + " will be rate-limited to " + getBlocksMaxQps + " per second");
+ this.getBlocksRateLimiter = RateLimiter.create(getBlocksMaxQps);
+ } else {
+ this.getBlocksRateLimiter = null;
+ }
this.namenode = NameNodeProxies.createProxy(conf, nameNodeUri,
BalancerProtocols.class, fallbackToSimpleAuth).getProxy();
@@ -160,8 +172,10 @@ public class NameNodeConnector implements Closeable {
/** @return blocks with locations. */
public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size, long
- minBlockSize)
- throws IOException {
+ minBlockSize) throws IOException {
+ if (getBlocksRateLimiter != null) {
+ getBlocksRateLimiter.acquire();
+ }
return namenode.getBlocks(datanode, size, minBlockSize);
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 12bacc0..f0ce288 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -3518,7 +3518,16 @@
HTTPS port for DataNode.
</description>
</property>
-
+<property>
+ <name>dfs.namenode.get-blocks.max-qps</name>
+ <value>20</value>
+ <description>
+ The maximum number of getBlocks RPCs data movement utilities can make to
+ a NameNode per second. Values less than or equal to 0 disable throttling.
+ This affects anything that uses a NameNodeConnector, i.e., the Balancer,
+ Mover, and StoragePolicySatisfier.
+ </description>
+</property>
<property>
<name>dfs.balancer.dispatcherThreads</name>
<value>200</value>
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
index fa026f0..4b0d653 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
@@ -72,6 +72,8 @@ import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
@@ -128,6 +130,7 @@ import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.Tool;
import org.apache.log4j.Level;
import org.junit.After;
+import org.junit.Before;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@@ -157,6 +160,16 @@ public class TestBalancer {
private static MiniKdc kdc;
private static File keytabFile;
private MiniDFSCluster cluster;
+ private AtomicInteger numGetBlocksCalls;
+ private AtomicLong startGetBlocksTime;
+ private AtomicLong endGetBlocksTime;
+
+ @Before
+ public void setup() {
+ numGetBlocksCalls = new AtomicInteger(0);
+ startGetBlocksTime = new AtomicLong(Long.MAX_VALUE);
+ endGetBlocksTime = new AtomicLong(Long.MIN_VALUE);
+ }
@After
public void shutdown() throws Exception {
@@ -791,7 +804,7 @@ public class TestBalancer {
long newCapacity, String newRack, NewNodeInfo nodes,
boolean useTool, boolean useFile) throws Exception {
doTest(conf, capacities, racks, newCapacity, newRack, nodes,
- useTool, useFile, false);
+ useTool, useFile, false, 0.3);
}
/** This test start a cluster with specified number of nodes,
@@ -810,12 +823,14 @@ public class TestBalancer {
* @param useFile - if true, the hosts to included or excluded will be stored in a
* file and then later read from the file.
* @param useNamesystemSpy - spy on FSNamesystem if true
+ * @param clusterUtilization - The utilization of the cluster to start, from
+ * 0.0 to 1.0
* @throws Exception
*/
private void doTest(Configuration conf, long[] capacities,
String[] racks, long newCapacity, String newRack, NewNodeInfo nodes,
boolean useTool, boolean useFile,
- boolean useNamesystemSpy) throws Exception {
+ boolean useNamesystemSpy, double clusterUtilization) throws Exception {
LOG.info("capacities = " + long2String(capacities));
LOG.info("racks = " + Arrays.asList(racks));
LOG.info("newCapacity= " + newCapacity);
@@ -845,8 +860,8 @@ public class TestBalancer {
long totalCapacity = sum(capacities);
- // fill up the cluster to be 30% full
- long totalUsedSpace = totalCapacity*3/10;
+ // fill up the cluster to be `clusterUtilization` full
+ long totalUsedSpace = (long) (totalCapacity * clusterUtilization);
createFile(cluster, filePath, totalUsedSpace / numOfDatanodes,
(short) numOfDatanodes, 0);
@@ -2135,33 +2150,34 @@ public class TestBalancer {
}
}
- private static int numGetBlocksCalls;
- private static long startGetBlocksTime, endGetBlocksTime;
-
private void spyFSNamesystem(NameNode nn) throws IOException {
FSNamesystem fsnSpy = NameNodeAdapter.spyOnNamesystem(nn);
- numGetBlocksCalls = 0;
- endGetBlocksTime = startGetBlocksTime = Time.monotonicNow();
doAnswer(new Answer<BlocksWithLocations>() {
@Override
public BlocksWithLocations answer(InvocationOnMock invocation)
throws Throwable {
+ long startTime = Time.monotonicNow();
+ startGetBlocksTime.getAndUpdate((curr) -> Math.min(curr, startTime));
BlocksWithLocations blk =
(BlocksWithLocations)invocation.callRealMethod();
- endGetBlocksTime = Time.monotonicNow();
- numGetBlocksCalls++;
+ long endTime = Time.monotonicNow();
+ endGetBlocksTime.getAndUpdate((curr) -> Math.max(curr, endTime));
+ numGetBlocksCalls.incrementAndGet();
return blk;
}}).when(fsnSpy).getBlocks(any(DatanodeID.class), anyLong(), anyLong());
}
/**
* Test that makes the Balancer to disperse RPCs to the NameNode
- * in order to avoid NN's RPC queue saturation.
+ * in order to avoid NN's RPC queue saturation. This not marked as @Test
+ * because it is run from {@link TestBalancerRPCDelay}.
*/
- void testBalancerRPCDelay() throws Exception {
+ void testBalancerRPCDelay(int getBlocksMaxQps) throws Exception {
final Configuration conf = new HdfsConfiguration();
initConf(conf);
conf.setInt(DFSConfigKeys.DFS_BALANCER_DISPATCHERTHREADS_KEY, 30);
+ conf.setInt(DFSConfigKeys.DFS_NAMENODE_GETBLOCKS_MAX_QPS_KEY,
+ getBlocksMaxQps);
int numDNs = 20;
long[] capacities = new long[numDNs];
@@ -2171,16 +2187,22 @@ public class TestBalancer {
racks[i] = (i < numDNs/2 ? RACK0 : RACK1);
}
doTest(conf, capacities, racks, CAPACITY, RACK2,
- new PortNumberBasedNodes(3, 0, 0), false, false, true);
+ // Use only 1 node and set the starting capacity to 50% to allow the
+ // balancing to complete in only one iteration. This is necessary
+ // because the startGetBlocksTime and endGetBlocksTime measures across
+ // all get block calls, so if two iterations are performed, the duration
+ // also includes the time it took to perform the block move ops in the
+ // first iteration
+ new PortNumberBasedNodes(1, 0, 0), false, false, true, 0.5);
assertTrue("Number of getBlocks should be not less than " +
- Dispatcher.BALANCER_NUM_RPC_PER_SEC,
- numGetBlocksCalls > Dispatcher.BALANCER_NUM_RPC_PER_SEC);
- long d = 1 + endGetBlocksTime - startGetBlocksTime;
- LOG.info("Balancer executed " + numGetBlocksCalls
- + " getBlocks in " + d + " msec.");
- assertTrue("Expected BALANCER_NUM_RPC_PER_SEC = " +
- Dispatcher.BALANCER_NUM_RPC_PER_SEC,
- (numGetBlocksCalls * 1000 / d) < Dispatcher.BALANCER_NUM_RPC_PER_SEC);
+ getBlocksMaxQps, numGetBlocksCalls.get() >= getBlocksMaxQps);
+ long durationMs = 1 + endGetBlocksTime.get() - startGetBlocksTime.get();
+ int durationSec = (int) Math.ceil(durationMs / 1000.0);
+ LOG.info("Balancer executed " + numGetBlocksCalls.get() + " getBlocks in "
+ + durationMs + " msec (round up to " + durationSec + " sec)");
+ long getBlockCallsPerSecond = numGetBlocksCalls.get() / durationSec;
+ assertTrue("Expected balancer getBlocks calls per second <= " +
+ getBlocksMaxQps, getBlockCallsPerSecond <= getBlocksMaxQps);
}
/**
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerRPCDelay.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerRPCDelay.java
index 960ad25..79c7f87 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerRPCDelay.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerRPCDelay.java
@@ -17,6 +17,9 @@
*/
package org.apache.hadoop.hdfs.server.balancer;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.junit.After;
+import org.junit.Before;
import org.junit.Test;
/**
@@ -25,8 +28,29 @@ import org.junit.Test;
*/
public class TestBalancerRPCDelay {
+ private TestBalancer testBalancer;
+
+ @Before
+ public void setup() {
+ testBalancer = new TestBalancer();
+ testBalancer.setup();
+ }
+
+ @After
+ public void teardown() throws Exception {
+ if (testBalancer != null) {
+ testBalancer.shutdown();
+ }
+ }
+
+ @Test(timeout=100000)
+ public void testBalancerRPCDelayQps3() throws Exception {
+ testBalancer.testBalancerRPCDelay(3);
+ }
+
@Test(timeout=100000)
- public void testBalancerRPCDelay() throws Exception {
- new TestBalancer().testBalancerRPCDelay();
+ public void testBalancerRPCDelayQpsDefault() throws Exception {
+ testBalancer.testBalancerRPCDelay(
+ DFSConfigKeys.DFS_NAMENODE_GETBLOCKS_MAX_QPS_DEFAULT);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org