You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ae...@apache.org on 2017/07/27 23:48:38 UTC
[07/34] hadoop git commit: HDFS-11742. Improve balancer usability
after HDFS-8818. Contributed by Kihwal Lee
HDFS-11742. Improve balancer usability after HDFS-8818. Contributed by Kihwal Lee
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/8e3a992e
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/8e3a992e
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/8e3a992e
Branch: refs/heads/HDFS-7240
Commit: 8e3a992eccff26a7344c3f0e719898fa97706b8c
Parents: 3b48f81
Author: Kihwal Lee <ki...@apache.org>
Authored: Fri Jul 21 09:14:19 2017 -0500
Committer: Kihwal Lee <ki...@apache.org>
Committed: Fri Jul 21 09:14:19 2017 -0500
----------------------------------------------------------------------
.../hadoop/hdfs/server/balancer/Dispatcher.java | 36 +++++++++++++++++++-
1 file changed, 35 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e3a992e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
index f855e45..9270fde 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
@@ -49,6 +49,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.DistributedFileSystem;
@@ -121,6 +122,7 @@ public class Dispatcher {
/** The maximum number of concurrent blocks moves at a datanode */
private final int maxConcurrentMovesPerNode;
+ private final int maxMoverThreads;
private final long getBlocksSize;
private final long getBlocksMinBlockSize;
@@ -139,11 +141,13 @@ public class Dispatcher {
static class Allocator {
private final int max;
private int count = 0;
+ private int lotSize = 1;
Allocator(int max) {
this.max = max;
}
+ /** Allocate specified number of items */
synchronized int allocate(int n) {
final int remaining = max - count;
if (remaining <= 0) {
@@ -155,9 +159,19 @@ public class Dispatcher {
}
}
+ /** Aloocate a single lot of items */
+ int allocate() {
+ return allocate(lotSize);
+ }
+
synchronized void reset() {
count = 0;
}
+
+ /** Set the lot size */
+ synchronized void setLotSize(int lotSize) {
+ this.lotSize = lotSize;
+ }
}
private static class GlobalBlockMap {
@@ -1017,6 +1031,7 @@ public class Dispatcher {
this.dispatchExecutor = dispatcherThreads == 0? null
: Executors.newFixedThreadPool(dispatcherThreads);
this.moverThreadAllocator = new Allocator(moverThreads);
+ this.maxMoverThreads = moverThreads;
this.maxConcurrentMovesPerNode = maxConcurrentMovesPerNode;
this.getBlocksSize = getBlocksSize;
@@ -1116,7 +1131,7 @@ public class Dispatcher {
final DDatanode targetDn = p.target.getDDatanode();
ExecutorService moveExecutor = targetDn.getMoveExecutor();
if (moveExecutor == null) {
- final int nThreads = moverThreadAllocator.allocate(maxConcurrentMovesPerNode);
+ final int nThreads = moverThreadAllocator.allocate();
if (nThreads > 0) {
moveExecutor = targetDn.initMoveExecutor(nThreads);
}
@@ -1166,6 +1181,25 @@ public class Dispatcher {
LOG.debug("Disperse Interval sec = " +
concurrentThreads / BALANCER_NUM_RPC_PER_SEC);
}
+
+ // Determine the size of each mover thread pool per target
+ int threadsPerTarget = maxMoverThreads/targets.size();
+ if (threadsPerTarget == 0) {
+ // Some scheduled moves will get ignored as some targets won't have
+ // any threads allocated.
+ moverThreadAllocator.setLotSize(1);
+ LOG.warn(DFSConfigKeys.DFS_BALANCER_MOVERTHREADS_KEY + "=" +
+ maxMoverThreads + " is too small for moving blocks to " +
+ targets.size() + " targets. Balancing may be slower.");
+ } else {
+ if (threadsPerTarget > maxConcurrentMovesPerNode) {
+ threadsPerTarget = maxConcurrentMovesPerNode;
+ LOG.info("Limiting threads per target to the specified max.");
+ }
+ moverThreadAllocator.setLotSize(threadsPerTarget);
+ LOG.info("Allocating " + threadsPerTarget + " threads per target.");
+ }
+
long dSec = 0;
final Iterator<Source> i = sources.iterator();
for (int j = 0; j < futures.length; j++) {
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org