You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ae...@apache.org on 2015/09/30 20:22:48 UTC
[14/52] [abbrv] hadoop git commit: HDFS-8899. Erasure Coding: use
threadpool for EC recovery tasks on DataNode. Contributed by Rakesh R.
HDFS-8899. Erasure Coding: use threadpool for EC recovery tasks on DataNode. Contributed by Rakesh R.
Change-Id: I9429706ae3c9b10a9274c07b98da6ed54cce192b
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ced438a4
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ced438a4
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ced438a4
Branch: refs/heads/HDFS-7240
Commit: ced438a4bf50fe0ac9072c128e18249e6742956a
Parents: c2ebab6
Author: Zhe Zhang <zh...@cloudera.com>
Authored: Tue Sep 15 10:43:13 2015 -0700
Committer: Zhe Zhang <zh...@cloudera.com>
Committed: Tue Sep 15 10:43:13 2015 -0700
----------------------------------------------------------------------
.../hadoop-hdfs/CHANGES-HDFS-EC-7285.txt | 3 ++
.../org/apache/hadoop/hdfs/DFSConfigKeys.java | 4 ++-
.../erasurecode/ErasureCodingWorker.java | 33 ++++++++++++++++----
3 files changed, 33 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ced438a4/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
index 39b5adc..acf62cb 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
@@ -424,3 +424,6 @@
HDFS-7351. Document the HDFS Erasure Coding feature.
(umamahesh and Zhe Zhang via wang)
+
+ HDFS-8899. Erasure Coding: use threadpool for EC recovery tasks on DataNode.
+ (Rakesh R via zhz)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ced438a4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index c4dd496..f7cda18 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -400,7 +400,9 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_DATANODE_STRIPED_READ_BUFFER_SIZE_KEY = "dfs.datanode.stripedread.buffer.size";
public static final int DFS_DATANODE_STRIPED_READ_BUFFER_SIZE_DEFAULT = 64 * 1024;
public static final String DFS_DATANODE_STRIPED_READ_THRESHOLD_MILLIS_KEY = "dfs.datanode.stripedread.threshold.millis";
- public static final int DFS_DATANODE_STRIPED_READ_THRESHOLD_MILLIS_DEFAULT = 5000; //5s
+ public static final int DFS_DATANODE_STRIPED_READ_THRESHOLD_MILLIS_DEFAULT = 5000; //5s
+ public static final String DFS_DATANODE_STRIPED_BLK_RECOVERY_THREADS_KEY = "dfs.datanode.striped.blockrecovery.threads.size";
+ public static final int DFS_DATANODE_STRIPED_BLK_RECOVERY_THREADS_DEFAULT = 8;
public static final String DFS_DATANODE_DNS_INTERFACE_KEY = "dfs.datanode.dns.interface";
public static final String DFS_DATANODE_DNS_INTERFACE_DEFAULT = "default";
public static final String DFS_DATANODE_DNS_NAMESERVER_KEY = "dfs.datanode.dns.nameserver";
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ced438a4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java
index f6a5ece..56b54f1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java
@@ -38,6 +38,7 @@ import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -92,6 +93,7 @@ public final class ErasureCodingWorker {
private final DataNode datanode;
private final Configuration conf;
+ private ThreadPoolExecutor STRIPED_BLK_RECOVERY_THREAD_POOL;
private ThreadPoolExecutor STRIPED_READ_THREAD_POOL;
private final int STRIPED_READ_THRESHOLD_MILLIS;
private final int STRIPED_READ_BUFFER_SIZE;
@@ -109,6 +111,10 @@ public final class ErasureCodingWorker {
STRIPED_READ_BUFFER_SIZE = conf.getInt(
DFSConfigKeys.DFS_DATANODE_STRIPED_READ_BUFFER_SIZE_KEY,
DFSConfigKeys.DFS_DATANODE_STRIPED_READ_BUFFER_SIZE_DEFAULT);
+
+ initializeStripedBlkRecoveryThreadPool(conf.getInt(
+ DFSConfigKeys.DFS_DATANODE_STRIPED_BLK_RECOVERY_THREADS_KEY,
+ DFSConfigKeys.DFS_DATANODE_STRIPED_BLK_RECOVERY_THREADS_DEFAULT));
}
private RawErasureDecoder newDecoder(int numDataUnits, int numParityUnits) {
@@ -142,6 +148,25 @@ public final class ErasureCodingWorker {
STRIPED_READ_THREAD_POOL.allowCoreThreadTimeOut(true);
}
+ private void initializeStripedBlkRecoveryThreadPool(int num) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Using striped block recovery; pool threads=" + num);
+ }
+ STRIPED_BLK_RECOVERY_THREAD_POOL = new ThreadPoolExecutor(2, num, 60,
+ TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(),
+ new Daemon.DaemonFactory() {
+ private final AtomicInteger threadIdx = new AtomicInteger(0);
+
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread t = super.newThread(r);
+ t.setName("stripedBlockRecovery-" + threadIdx.getAndIncrement());
+ return t;
+ }
+ });
+ STRIPED_BLK_RECOVERY_THREAD_POOL.allowCoreThreadTimeOut(true);
+ }
+
/**
* Handles the Erasure Coding recovery work commands.
*
@@ -150,12 +175,8 @@ public final class ErasureCodingWorker {
*/
public void processErasureCodingTasks(Collection<BlockECRecoveryInfo> ecTasks) {
for (BlockECRecoveryInfo recoveryInfo : ecTasks) {
- try {
- new Daemon(new ReconstructAndTransferBlock(recoveryInfo)).start();
- } catch (Throwable e) {
- LOG.warn("Failed to recover striped block " +
- recoveryInfo.getExtendedBlock().getLocalBlock(), e);
- }
+ STRIPED_BLK_RECOVERY_THREAD_POOL.submit(new ReconstructAndTransferBlock(
+ recoveryInfo));
}
}