You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by in...@apache.org on 2019/03/28 10:37:47 UTC

[hadoop] branch trunk updated: HDFS-14295. Add Threadpool for DataTransfers. Contributed by David Mollitor.

This is an automated email from the ASF dual-hosted git repository.

inigoiri pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 15d38b1  HDFS-14295. Add Threadpool for DataTransfers. Contributed by David Mollitor.
15d38b1 is described below

commit 15d38b1bf9fbd41658f6980c1a484dd28f746654
Author: Inigo Goiri <in...@apache.org>
AuthorDate: Thu Mar 28 03:37:33 2019 -0700

    HDFS-14295. Add Threadpool for DataTransfers. Contributed by David Mollitor.
---
 .../hadoop/hdfs/server/datanode/DataNode.java      | 51 ++++++++++++++--------
 1 file changed, 34 insertions(+), 17 deletions(-)

diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index 4df9225..1e43291 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -214,6 +214,7 @@ import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Time;
 import org.apache.hadoop.util.Timer;
 import org.apache.hadoop.util.VersionInfo;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
 import org.apache.htrace.core.Tracer;
 import org.eclipse.jetty.util.ajax.JSON;
 
@@ -396,6 +397,8 @@ public class DataNode extends ReconfigurableBase
   private static final double CONGESTION_RATIO = 1.5;
   private DiskBalancer diskBalancer;
 
+  private final ExecutorService xferService;
+
   @Nullable
   private final StorageLocationChecker storageLocationChecker;
 
@@ -436,6 +439,8 @@ public class DataNode extends ReconfigurableBase
     initOOBTimeout();
     storageLocationChecker = null;
     volumeChecker = new DatasetVolumeChecker(conf, new Timer());
+    this.xferService =
+        HadoopExecutors.newCachedThreadPool(new Daemon.DaemonFactory());
   }
 
   /**
@@ -476,6 +481,8 @@ public class DataNode extends ReconfigurableBase
         conf.get("hadoop.hdfs.configuration.version", "UNSPECIFIED");
 
     this.volumeChecker = new DatasetVolumeChecker(conf, new Timer());
+    this.xferService =
+        HadoopExecutors.newCachedThreadPool(new Daemon.DaemonFactory());
 
     // Determine whether we should try to pass file descriptors to clients.
     if (conf.getBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY,
@@ -2081,6 +2088,9 @@ public class DataNode extends ReconfigurableBase
     // wait reconfiguration thread, if any, to exit
     shutdownReconfigurationTask();
 
+    LOG.info("Waiting up to 30 seconds for transfer threads to complete");
+    HadoopExecutors.shutdown(this.xferService, LOG, 15L, TimeUnit.SECONDS);
+
     // wait for all data receiver threads to exit
     if (this.threadGroup != null) {
       int sleepMs = 2;
@@ -2354,16 +2364,16 @@ public class DataNode extends ReconfigurableBase
     
     int numTargets = xferTargets.length;
     if (numTargets > 0) {
-      StringBuilder xfersBuilder = new StringBuilder();
-      for (int i = 0; i < numTargets; i++) {
-        xfersBuilder.append(xferTargets[i]).append(" ");
-      }
-      LOG.info(bpReg + " Starting thread to transfer " + 
-               block + " to " + xfersBuilder);                       
+      final String xferTargetsString =
+          StringUtils.join(" ", Arrays.asList(xferTargets));
+      LOG.info("{} Starting thread to transfer {} to {}", bpReg, block,
+          xferTargetsString);
+
+      final DataTransfer dataTransferTask = new DataTransfer(xferTargets,
+          xferTargetStorageTypes, xferTargetStorageIDs, block,
+          BlockConstructionStage.PIPELINE_SETUP_CREATE, "");
 
-      new Daemon(new DataTransfer(xferTargets, xferTargetStorageTypes,
-          xferTargetStorageIDs, block,
-          BlockConstructionStage.PIPELINE_SETUP_CREATE, "")).start();
+      this.xferService.execute(dataTransferTask);
     }
   }
 
@@ -3041,15 +3051,22 @@ public class DataNode extends ReconfigurableBase
     b.setNumBytes(visible);
 
     if (targets.length > 0) {
-      Daemon daemon = new Daemon(threadGroup,
-          new DataTransfer(targets, targetStorageTypes, targetStorageIds, b,
-              stage, client));
-      daemon.start();
+      if (LOG.isDebugEnabled()) {
+        final String xferTargetsString =
+            StringUtils.join(" ", Arrays.asList(targets));
+        LOG.debug("Transferring a replica to {}", xferTargetsString);
+      }
+
+      final DataTransfer dataTransferTask = new DataTransfer(targets,
+          targetStorageTypes, targetStorageIds, b, stage, client);
+
+      @SuppressWarnings("unchecked")
+      Future<Void> f = (Future<Void>) this.xferService.submit(dataTransferTask);
       try {
-        daemon.join();
-      } catch (InterruptedException e) {
-        throw new IOException(
-            "Pipeline recovery for " + b + " is interrupted.", e);
+        f.get();
+      } catch (InterruptedException | ExecutionException e) {
+        throw new IOException("Pipeline recovery for " + b + " is interrupted.",
+            e);
       }
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org