You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ae...@apache.org on 2015/09/30 20:22:48 UTC

[14/52] [abbrv] hadoop git commit: HDFS-8899. Erasure Coding: use threadpool for EC recovery tasks on DataNode. Contributed by Rakesh R.

HDFS-8899. Erasure Coding: use threadpool for EC recovery tasks on DataNode. Contributed by Rakesh R.

Change-Id: I9429706ae3c9b10a9274c07b98da6ed54cce192b


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ced438a4
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ced438a4
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ced438a4

Branch: refs/heads/HDFS-7240
Commit: ced438a4bf50fe0ac9072c128e18249e6742956a
Parents: c2ebab6
Author: Zhe Zhang <zh...@cloudera.com>
Authored: Tue Sep 15 10:43:13 2015 -0700
Committer: Zhe Zhang <zh...@cloudera.com>
Committed: Tue Sep 15 10:43:13 2015 -0700

----------------------------------------------------------------------
 .../hadoop-hdfs/CHANGES-HDFS-EC-7285.txt        |  3 ++
 .../org/apache/hadoop/hdfs/DFSConfigKeys.java   |  4 ++-
 .../erasurecode/ErasureCodingWorker.java        | 33 ++++++++++++++++----
 3 files changed, 33 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/ced438a4/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
index 39b5adc..acf62cb 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
@@ -424,3 +424,6 @@
 
     HDFS-7351. Document the HDFS Erasure Coding feature.
     (umamahesh and Zhe Zhang via wang)
+
+    HDFS-8899. Erasure Coding: use threadpool for EC recovery tasks on DataNode.
+    (Rakesh R via zhz)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ced438a4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index c4dd496..f7cda18 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -400,7 +400,9 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final String  DFS_DATANODE_STRIPED_READ_BUFFER_SIZE_KEY = "dfs.datanode.stripedread.buffer.size";
   public static final int     DFS_DATANODE_STRIPED_READ_BUFFER_SIZE_DEFAULT = 64 * 1024;
   public static final String  DFS_DATANODE_STRIPED_READ_THRESHOLD_MILLIS_KEY = "dfs.datanode.stripedread.threshold.millis";
-  public static final int     DFS_DATANODE_STRIPED_READ_THRESHOLD_MILLIS_DEFAULT = 5000; //5s 
+  public static final int     DFS_DATANODE_STRIPED_READ_THRESHOLD_MILLIS_DEFAULT = 5000; //5s
+  public static final String  DFS_DATANODE_STRIPED_BLK_RECOVERY_THREADS_KEY = "dfs.datanode.striped.blockrecovery.threads.size";
+  public static final int     DFS_DATANODE_STRIPED_BLK_RECOVERY_THREADS_DEFAULT = 8;
   public static final String  DFS_DATANODE_DNS_INTERFACE_KEY = "dfs.datanode.dns.interface";
   public static final String  DFS_DATANODE_DNS_INTERFACE_DEFAULT = "default";
   public static final String  DFS_DATANODE_DNS_NAMESERVER_KEY = "dfs.datanode.dns.nameserver";

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ced438a4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java
index f6a5ece..56b54f1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java
@@ -38,6 +38,7 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.CompletionService;
 import java.util.concurrent.ExecutorCompletionService;
 import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -92,6 +93,7 @@ public final class ErasureCodingWorker {
   private final DataNode datanode; 
   private final Configuration conf;
 
+  private ThreadPoolExecutor STRIPED_BLK_RECOVERY_THREAD_POOL;
   private ThreadPoolExecutor STRIPED_READ_THREAD_POOL;
   private final int STRIPED_READ_THRESHOLD_MILLIS;
   private final int STRIPED_READ_BUFFER_SIZE;
@@ -109,6 +111,10 @@ public final class ErasureCodingWorker {
     STRIPED_READ_BUFFER_SIZE = conf.getInt(
         DFSConfigKeys.DFS_DATANODE_STRIPED_READ_BUFFER_SIZE_KEY,
         DFSConfigKeys.DFS_DATANODE_STRIPED_READ_BUFFER_SIZE_DEFAULT);
+
+    initializeStripedBlkRecoveryThreadPool(conf.getInt(
+        DFSConfigKeys.DFS_DATANODE_STRIPED_BLK_RECOVERY_THREADS_KEY,
+        DFSConfigKeys.DFS_DATANODE_STRIPED_BLK_RECOVERY_THREADS_DEFAULT));
   }
   
   private RawErasureDecoder newDecoder(int numDataUnits, int numParityUnits) {
@@ -142,6 +148,25 @@ public final class ErasureCodingWorker {
     STRIPED_READ_THREAD_POOL.allowCoreThreadTimeOut(true);
   }
 
+  private void initializeStripedBlkRecoveryThreadPool(int num) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Using striped block recovery; pool threads=" + num);
+    }
+    STRIPED_BLK_RECOVERY_THREAD_POOL = new ThreadPoolExecutor(2, num, 60,
+        TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(),
+        new Daemon.DaemonFactory() {
+          private final AtomicInteger threadIdx = new AtomicInteger(0);
+
+          @Override
+          public Thread newThread(Runnable r) {
+            Thread t = super.newThread(r);
+            t.setName("stripedBlockRecovery-" + threadIdx.getAndIncrement());
+            return t;
+          }
+        });
+    STRIPED_BLK_RECOVERY_THREAD_POOL.allowCoreThreadTimeOut(true);
+  }
+
   /**
    * Handles the Erasure Coding recovery work commands.
    * 
@@ -150,12 +175,8 @@ public final class ErasureCodingWorker {
    */
   public void processErasureCodingTasks(Collection<BlockECRecoveryInfo> ecTasks) {
     for (BlockECRecoveryInfo recoveryInfo : ecTasks) {
-      try {
-        new Daemon(new ReconstructAndTransferBlock(recoveryInfo)).start();
-      } catch (Throwable e) {
-        LOG.warn("Failed to recover striped block " + 
-            recoveryInfo.getExtendedBlock().getLocalBlock(), e);
-      }
+      STRIPED_BLK_RECOVERY_THREAD_POOL.submit(new ReconstructAndTransferBlock(
+          recoveryInfo));
     }
   }