You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ka...@apache.org on 2016/10/26 18:30:52 UTC
[26/50] [abbrv] hadoop git commit: HDFS-11015. Enforce timeout in
balancer. Contributed by Kihwal Lee.
HDFS-11015. Enforce timeout in balancer. Contributed by Kihwal Lee.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/f6367c5f
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/f6367c5f
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/f6367c5f
Branch: refs/heads/YARN-4752
Commit: f6367c5f44a88cb5eb7edffb015b10b657504a61
Parents: 09ef97d
Author: Zhe Zhang <zh...@apache.org>
Authored: Tue Oct 25 10:18:57 2016 -0700
Committer: Zhe Zhang <zh...@apache.org>
Committed: Tue Oct 25 10:19:13 2016 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/hdfs/DFSConfigKeys.java | 2 +
.../hadoop/hdfs/server/balancer/Balancer.java | 5 +-
.../hadoop/hdfs/server/balancer/Dispatcher.java | 49 +++++++++++++++-----
.../src/main/resources/hdfs-default.xml | 15 ++++++
4 files changed, 58 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f6367c5f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index d54c109..951ad68 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -496,6 +496,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_BALANCER_ADDRESS_DEFAULT= "0.0.0.0:0";
public static final String DFS_BALANCER_KEYTAB_FILE_KEY = "dfs.balancer.keytab.file";
public static final String DFS_BALANCER_KERBEROS_PRINCIPAL_KEY = "dfs.balancer.kerberos.principal";
+ public static final String DFS_BALANCER_BLOCK_MOVE_TIMEOUT = "dfs.balancer.block-move.timeout";
+ public static final int DFS_BALANCER_BLOCK_MOVE_TIMEOUT_DEFAULT = 0;
public static final String DFS_MOVER_MOVEDWINWIDTH_KEY = "dfs.mover.movedWinWidth";
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f6367c5f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
index 2037d01..583ade3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
@@ -282,13 +282,16 @@ public class Balancer {
final long getBlocksMinBlockSize = getLongBytes(conf,
DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY,
DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_DEFAULT);
+ final int blockMoveTimeout = conf.getInt(
+ DFSConfigKeys.DFS_BALANCER_BLOCK_MOVE_TIMEOUT,
+ DFSConfigKeys.DFS_BALANCER_BLOCK_MOVE_TIMEOUT_DEFAULT);
this.nnc = theblockpool;
this.dispatcher =
new Dispatcher(theblockpool, p.getIncludedNodes(),
p.getExcludedNodes(), movedWinWidth, moverThreads,
dispatcherThreads, maxConcurrentMovesPerNode, getBlocksSize,
- getBlocksMinBlockSize, conf);
+ getBlocksMinBlockSize, blockMoveTimeout, conf);
this.threshold = p.getThreshold();
this.policy = p.getBalancingPolicy();
this.sourceNodes = p.getSourceNodes();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f6367c5f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
index e5c5e53..e090174 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
@@ -121,6 +121,7 @@ public class Dispatcher {
private final long getBlocksSize;
private final long getBlocksMinBlockSize;
+ private final long blockMoveTimeout;
private final int ioFileBufferSize;
@@ -331,6 +332,11 @@ public class Dispatcher {
getXferAddr(Dispatcher.this.connectToDnViaHostname)),
HdfsConstants.READ_TIMEOUT);
+ // Set read timeout so that it doesn't hang forever against
+ // unresponsive nodes. Datanode normally sends IN_PROGRESS response
+ // twice within the client read timeout period (every 30 seconds by
+ // default). Here, we make it give up after 5 minutes of no response.
+ sock.setSoTimeout(HdfsConstants.READ_TIMEOUT * 5);
sock.setKeepAlive(true);
OutputStream unbufOut = sock.getOutputStream();
@@ -386,13 +392,26 @@ public class Dispatcher {
source.getDatanodeInfo().getDatanodeUuid(), proxySource.datanode);
}
+ /** Check whether to continue waiting for response */
+ private boolean stopWaitingForResponse(long startTime) {
+ return source.isIterationOver() ||
+ (blockMoveTimeout > 0 &&
+ (Time.monotonicNow() - startTime > blockMoveTimeout));
+ }
+
/** Receive a reportedBlock copy response from the input stream */
private void receiveResponse(DataInputStream in) throws IOException {
+ long startTime = Time.monotonicNow();
BlockOpResponseProto response =
BlockOpResponseProto.parseFrom(vintPrefixed(in));
while (response.getStatus() == Status.IN_PROGRESS) {
// read intermediate responses
response = BlockOpResponseProto.parseFrom(vintPrefixed(in));
+ // Stop waiting for slow block moves. Even if it stops waiting,
+ // the actual move may continue.
+ if (stopWaitingForResponse(startTime)) {
+ throw new IOException("Block move timed out");
+ }
}
String logInfo = "reportedBlock move is failed";
DataTransferProtoUtil.checkBlockOpStatus(response, logInfo);
@@ -671,6 +690,7 @@ public class Dispatcher {
private final List<Task> tasks = new ArrayList<Task>(2);
private long blocksToReceive = 0L;
+ private final long startTime = Time.monotonicNow();
/**
* Source blocks point to the objects in {@link Dispatcher#globalBlocks}
* because we want to keep one copy of a block and be aware that the
@@ -682,6 +702,13 @@ public class Dispatcher {
dn.super(storageType, maxSize2Move);
}
+ /**
+ * Check if the iteration is over
+ */
+ public boolean isIterationOver() {
+ return (Time.monotonicNow()-startTime > MAX_ITERATION_TIME);
+ }
+
/** Add a task */
void addTask(Task task) {
Preconditions.checkState(task.target != this,
@@ -838,24 +865,15 @@ public class Dispatcher {
* elapsed time of the iteration has exceeded the max time limit.
*/
private void dispatchBlocks() {
- final long startTime = Time.monotonicNow();
this.blocksToReceive = 2 * getScheduledSize();
- boolean isTimeUp = false;
int noPendingMoveIteration = 0;
- while (!isTimeUp && getScheduledSize() > 0
+ while (getScheduledSize() > 0 && !isIterationOver()
&& (!srcBlocks.isEmpty() || blocksToReceive > 0)) {
if (LOG.isTraceEnabled()) {
LOG.trace(this + " blocksToReceive=" + blocksToReceive
+ ", scheduledSize=" + getScheduledSize()
+ ", srcBlocks#=" + srcBlocks.size());
}
- // check if time is up or not
- if (Time.monotonicNow() - startTime > MAX_ITERATION_TIME) {
- LOG.info("Time up (max time=" + MAX_ITERATION_TIME/1000
- + " seconds). Skipping " + this);
- isTimeUp = true;
- continue;
- }
final PendingMove p = chooseNextMove();
if (p != null) {
// Reset no pending move counter
@@ -902,6 +920,11 @@ public class Dispatcher {
} catch (InterruptedException ignored) {
}
}
+
+ if (isIterationOver()) {
+ LOG.info("The maximum iteration time (" + MAX_ITERATION_TIME/1000
+ + " seconds) has been reached. Stopping " + this);
+ }
}
@Override
@@ -921,13 +944,14 @@ public class Dispatcher {
int dispatcherThreads, int maxConcurrentMovesPerNode, Configuration conf) {
this(nnc, includedNodes, excludedNodes, movedWinWidth,
moverThreads, dispatcherThreads, maxConcurrentMovesPerNode,
- 0L, 0L, conf);
+ 0L, 0L, 0, conf);
}
Dispatcher(NameNodeConnector nnc, Set<String> includedNodes,
Set<String> excludedNodes, long movedWinWidth, int moverThreads,
int dispatcherThreads, int maxConcurrentMovesPerNode,
- long getBlocksSize, long getBlocksMinBlockSize, Configuration conf) {
+ long getBlocksSize, long getBlocksMinBlockSize,
+ int blockMoveTimeout, Configuration conf) {
this.nnc = nnc;
this.excludedNodes = excludedNodes;
this.includedNodes = includedNodes;
@@ -942,6 +966,7 @@ public class Dispatcher {
this.getBlocksSize = getBlocksSize;
this.getBlocksMinBlockSize = getBlocksMinBlockSize;
+ this.blockMoveTimeout = blockMoveTimeout;
this.saslClient = new SaslDataTransferClient(conf,
DataTransferSaslUtil.getSaslPropertiesResolver(conf),
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f6367c5f/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 483663e..61a7063 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -3228,6 +3228,21 @@
</property>
<property>
+ <name>dfs.balancer.block-move.timeout</name>
+ <value>0</value>
+ <description>
+ Maximum amount of time in milliseconds for a block to move. If this is set
+ greater than 0, Balancer will stop waiting for a block move completion
+ after this time. In typical clusters, a 3 to 5 minute timeout is reasonable.
+ If timeout happens to a large proportion of block moves, this needs to be
+ increased. It could also be that too much work is dispatched and many nodes
+ are constantly exceeding the bandwidth limit as a result. In that case,
+ other balancer parameters might need to be adjusted.
+ It is disabled (0) by default.
+ </description>
+</property>
+
+<property>
<name>dfs.block.invalidate.limit</name>
<value>1000</value>
<description>
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org