You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by in...@apache.org on 2018/10/02 21:40:41 UTC

[41/50] [abbrv] hadoop git commit: HDFS-13768. Adding replicas to volume map makes DataNode start slowly. Contributed by Surendra Singh Lilhore.

HDFS-13768. Adding replicas to volume map makes DataNode start slowly. Contributed by Surendra Singh Lilhore.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/56893557
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/56893557
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/56893557

Branch: refs/heads/HDFS-13532
Commit: 5689355783de005ebc604f4403dc5129a286bfca
Parents: f6c5ef9
Author: Yiqun Lin <yq...@apache.org>
Authored: Tue Oct 2 09:43:14 2018 +0800
Committer: Yiqun Lin <yq...@apache.org>
Committed: Tue Oct 2 09:43:14 2018 +0800

----------------------------------------------------------------------
 .../org/apache/hadoop/hdfs/DFSConfigKeys.java   |   3 +
 .../hadoop/hdfs/server/datanode/DataNode.java   |   2 +-
 .../datanode/fsdataset/impl/BlockPoolSlice.java | 177 +++++++++++++++++--
 .../datanode/fsdataset/impl/FsDatasetImpl.java  |  14 ++
 .../datanode/fsdataset/impl/FsDatasetUtil.java  |  30 ++--
 .../datanode/fsdataset/impl/FsVolumeList.java   |   5 +-
 .../datanode/fsdataset/impl/ReplicaMap.java     |  25 +++
 .../src/main/resources/hdfs-default.xml         |   9 +
 .../fsdataset/impl/TestFsVolumeList.java        |  64 ++++++-
 9 files changed, 300 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/56893557/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index a7e7b9b..d8024dc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -365,6 +365,9 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final long    DFS_CONTENT_SUMMARY_SLEEP_MICROSEC_DEFAULT = 500;
   public static final String  DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY = "dfs.datanode.failed.volumes.tolerated";
   public static final int     DFS_DATANODE_FAILED_VOLUMES_TOLERATED_DEFAULT = 0;
+  public static final String
+      DFS_DATANODE_VOLUMES_REPLICA_ADD_THREADPOOL_SIZE_KEY =
+      "dfs.datanode.volumes.replica-add.threadpool.size";
   public static final String  DFS_DATANODE_SYNCONCLOSE_KEY = "dfs.datanode.synconclose";
   public static final boolean DFS_DATANODE_SYNCONCLOSE_DEFAULT = false;
   public static final String  DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_KEY = "dfs.datanode.socket.reuse.keepalive";

http://git-wip-us.apache.org/repos/asf/hadoop/blob/56893557/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index c980395..270e30b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -1695,7 +1695,7 @@ public class DataNode extends ReconfigurableBase
     return blockPoolManager.get(bpid);
   }
   
-  int getBpOsCount() {
+  public int getBpOsCount() {
     return blockPoolManager.getAllNamenodeThreads().size();
   }
   

http://git-wip-us.apache.org/repos/asf/hadoop/blob/56893557/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
index 2adfb6b..b9b581f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
@@ -28,8 +28,19 @@ import java.io.InputStream;
 import java.io.OutputStreamWriter;
 import java.io.RandomAccessFile;
 import java.io.Writer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
 import java.util.Iterator;
+import java.util.List;
+import java.util.Queue;
 import java.util.Scanner;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.ForkJoinTask;
+import java.util.concurrent.RecursiveAction;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.slf4j.Logger;
@@ -52,8 +63,8 @@ import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaBuilder;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.RamDiskReplicaTracker.RamDiskReplica;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
-
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.MultipleIOException;
 import org.apache.hadoop.util.AutoCloseableLock;
 import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.DiskChecker;
@@ -96,6 +107,17 @@ class BlockPoolSlice {
   private final int maxDataLength;
   private final FileIoProvider fileIoProvider;
 
+  private static ForkJoinPool addReplicaThreadPool = null;
+  private static final int VOLUMES_REPLICA_ADD_THREADPOOL_SIZE = Runtime
+      .getRuntime().availableProcessors();
+  private static final Comparator<File> FILE_COMPARATOR =
+      new Comparator<File>() {
+    @Override
+    public int compare(File f1, File f2) {
+      return f1.getName().compareTo(f2.getName());
+    }
+  };
+
   // TODO:FEDERATION scalability issue - a thread per DU is needed
   private final GetSpaceUsed dfsUsage;
 
@@ -161,13 +183,15 @@ class BlockPoolSlice {
                                                      .setConf(conf)
                                                      .setInitialUsed(loadDfsUsed())
                                                      .build();
-
+    // initialize add replica fork join pool
+    initializeAddReplicaPool(conf);
     // Make the dfs usage to be saved during shutdown.
     shutdownHook = new Runnable() {
       @Override
       public void run() {
         if (!dfsUsedSaved) {
           saveDfsUsed();
+          addReplicaThreadPool.shutdownNow();
         }
       }
     };
@@ -175,6 +199,21 @@ class BlockPoolSlice {
         SHUTDOWN_HOOK_PRIORITY);
   }
 
+  private synchronized void initializeAddReplicaPool(Configuration conf) {
+    if (addReplicaThreadPool == null) {
+      FsDatasetImpl dataset = (FsDatasetImpl) volume.getDataset();
+      int numberOfBlockPoolSlice = dataset.getVolumeCount()
+          * dataset.getBPServiceCount();
+      int poolsize = Math.max(numberOfBlockPoolSlice,
+          VOLUMES_REPLICA_ADD_THREADPOOL_SIZE);
+      // Default pool sizes is max of (volume * number of bp_service) and
+      // number of processor.
+      addReplicaThreadPool = new ForkJoinPool(conf.getInt(
+          DFSConfigKeys.DFS_DATANODE_VOLUMES_REPLICA_ADD_THREADPOOL_SIZE_KEY,
+          poolsize));
+    }
+  }
+
   File getDirectory() {
     return currentDir.getParentFile();
   }
@@ -374,10 +413,55 @@ class BlockPoolSlice {
 
     boolean  success = readReplicasFromCache(volumeMap, lazyWriteReplicaMap);
     if (!success) {
+      List<IOException> exceptions = Collections
+          .synchronizedList(new ArrayList<IOException>());
+      Queue<RecursiveAction> subTaskQueue =
+          new ConcurrentLinkedQueue<RecursiveAction>();
+
       // add finalized replicas
-      addToReplicasMap(volumeMap, finalizedDir, lazyWriteReplicaMap, true);
+      AddReplicaProcessor task = new AddReplicaProcessor(volumeMap,
+          finalizedDir, lazyWriteReplicaMap, true, exceptions, subTaskQueue);
+      ForkJoinTask<Void> finalizedTask = addReplicaThreadPool.submit(task);
+
       // add rbw replicas
-      addToReplicasMap(volumeMap, rbwDir, lazyWriteReplicaMap, false);
+      task = new AddReplicaProcessor(volumeMap, rbwDir, lazyWriteReplicaMap,
+          false, exceptions, subTaskQueue);
+      ForkJoinTask<Void> rbwTask = addReplicaThreadPool.submit(task);
+
+      try {
+        finalizedTask.get();
+        rbwTask.get();
+      } catch (InterruptedException | ExecutionException e) {
+        exceptions.add(new IOException(
+            "Failed to start sub tasks to add replica in replica map :"
+                + e.getMessage()));
+      }
+
+      //wait for all the tasks to finish.
+      waitForSubTaskToFinish(subTaskQueue, exceptions);
+    }
+  }
+
+  /**
+   * Wait till all the recursive task for add replica to volume completed.
+   *
+   * @param subTaskQueue
+   *          {@link AddReplicaProcessor} tasks list.
+   * @param exceptions
+   *          exceptions occurred in sub tasks.
+   * @throws IOException
+   *           throw if any sub task or multiple sub tasks failed.
+   */
+  private void waitForSubTaskToFinish(Queue<RecursiveAction> subTaskQueue,
+      List<IOException> exceptions) throws IOException {
+    while (!subTaskQueue.isEmpty()) {
+      RecursiveAction task = subTaskQueue.poll();
+      if (task != null) {
+        task.join();
+      }
+    }
+    if (!exceptions.isEmpty()) {
+      throw MultipleIOException.createIOException(exceptions);
     }
   }
 
@@ -526,10 +610,10 @@ class BlockPoolSlice {
       }
     }
 
-    ReplicaInfo oldReplica = volumeMap.get(bpid, newReplica.getBlockId());
-    if (oldReplica == null) {
-      volumeMap.add(bpid, newReplica);
-    } else {
+    ReplicaInfo tmpReplicaInfo = volumeMap.addAndGet(bpid, newReplica);
+    ReplicaInfo oldReplica = (tmpReplicaInfo == newReplica) ? null
+        : tmpReplicaInfo;
+    if (oldReplica != null) {
       // We have multiple replicas of the same block so decide which one
       // to keep.
       newReplica = resolveDuplicateReplicas(newReplica, oldReplica, volumeMap);
@@ -558,15 +642,23 @@ class BlockPoolSlice {
    *                                storage.
    * @param isFinalized true if the directory has finalized replicas;
    *                    false if the directory has rbw replicas
+   * @param exceptions list of exception which need to return to parent thread.
+   * @param subTaskQueue queue of sub tasks
    */
   void addToReplicasMap(ReplicaMap volumeMap, File dir,
-                        final RamDiskReplicaTracker lazyWriteReplicaMap,
-                        boolean isFinalized)
+      final RamDiskReplicaTracker lazyWriteReplicaMap, boolean isFinalized,
+      List<IOException> exceptions, Queue<RecursiveAction> subTaskQueue)
       throws IOException {
     File[] files = fileIoProvider.listFiles(volume, dir);
-    for (File file : files) {
+    Arrays.sort(files, FILE_COMPARATOR);
+    for (int i = 0; i < files.length; i++) {
+      File file = files[i];
       if (file.isDirectory()) {
-        addToReplicasMap(volumeMap, file, lazyWriteReplicaMap, isFinalized);
+        // Launch new sub task.
+        AddReplicaProcessor subTask = new AddReplicaProcessor(volumeMap, file,
+            lazyWriteReplicaMap, isFinalized, exceptions, subTaskQueue);
+        subTask.fork();
+        subTaskQueue.add(subTask);
       }
 
       if (isFinalized && FsDatasetUtil.isUnlinkTmpFile(file)) {
@@ -581,7 +673,7 @@ class BlockPoolSlice {
       }
 
       long genStamp = FsDatasetUtil.getGenerationStampFromFile(
-          files, file);
+          files, file, i);
       long blockId = Block.filename2id(file.getName());
       Block block = new Block(blockId, file.length(), genStamp);
       addReplicaToReplicasMap(block, volumeMap, lazyWriteReplicaMap,
@@ -886,4 +978,63 @@ class BlockPoolSlice {
   public long getNumOfBlocks() {
     return numOfBlocks.get();
   }
+
+  /**
+   * Recursive action for add replica in map.
+   */
+  class AddReplicaProcessor extends RecursiveAction {
+
+    private ReplicaMap volumeMap;
+    private File dir;
+    private RamDiskReplicaTracker lazyWriteReplicaMap;
+    private boolean isFinalized;
+    private List<IOException> exceptions;
+    private Queue<RecursiveAction> subTaskQueue;
+
+    /**
+     * @param volumeMap
+     *          the replicas map
+     * @param dir
+     *          an input directory
+     * @param lazyWriteReplicaMap
+     *          Map of replicas on transient storage.
+     * @param isFinalized
+     *          true if the directory has finalized replicas; false if the
+     *          directory has rbw replicas
+     * @param exceptions
+     *          List of exception which need to return to parent thread.
+     * @param subTaskQueue
+     *          queue of sub tasks
+     */
+    AddReplicaProcessor(ReplicaMap volumeMap, File dir,
+        RamDiskReplicaTracker lazyWriteReplicaMap, boolean isFinalized,
+        List<IOException> exceptions, Queue<RecursiveAction> subTaskQueue) {
+      this.volumeMap = volumeMap;
+      this.dir = dir;
+      this.lazyWriteReplicaMap = lazyWriteReplicaMap;
+      this.isFinalized = isFinalized;
+      this.exceptions = exceptions;
+      this.subTaskQueue = subTaskQueue;
+    }
+
+    @Override
+    protected void compute() {
+      try {
+        addToReplicasMap(volumeMap, dir, lazyWriteReplicaMap, isFinalized,
+            exceptions, subTaskQueue);
+      } catch (IOException e) {
+        LOG.warn("Caught exception while adding replicas from " + volume
+            + " in subtask. Will throw later.", e);
+        exceptions.add(e);
+      }
+    }
+  }
+
+  /**
+   * Return the size of fork pool used for adding replica in map.
+   */
+  @VisibleForTesting
+  public static int getAddReplicaForkPoolSize() {
+    return addReplicaThreadPool.getPoolSize();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/56893557/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index 27196c2..027a0bf 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -3307,6 +3307,20 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     this.timer = newTimer;
   }
 
+  /**
+   * Return the number of BP service count.
+   */
+  public int getBPServiceCount() {
+    return datanode.getBpOsCount();
+  }
+
+  /**
+   * Return the number of volume.
+   */
+  public int getVolumeCount() {
+    return volumes.getVolumes().size();
+  }
+
   void stopAllDataxceiverThreads(FsVolumeImpl volume) {
     try (AutoCloseableLock lock = datasetLock.acquire()) {
       for (String blockPoolId : volumeMap.getBlockPoolList()) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/56893557/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java
index 9f115a0..8a3b237 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java
@@ -117,21 +117,27 @@ public class FsDatasetUtil {
   }
 
   /**
-   * Find the meta-file for the specified block file
-   * and then return the generation stamp from the name of the meta-file.
+   * Find the meta-file for the specified block file and then return the
+   * generation stamp from the name of the meta-file. Generally meta file will
+   * be the next file in sorted array of file's.
+   *
+   * @param listdir
+   *          sorted list of file based on name.
+   * @param blockFile
+   *          block file for which generation stamp is needed.
+   * @param index
+   *          index of block file in array.
+   * @return generation stamp for block file.
    */
-  static long getGenerationStampFromFile(File[] listdir, File blockFile)
-      throws IOException {
+  static long getGenerationStampFromFile(File[] listdir, File blockFile,
+      int index) {
     String blockName = blockFile.getName();
-    for (int j = 0; j < listdir.length; j++) {
-      String path = listdir[j].getName();
-      if (!path.startsWith(blockName)) {
-        continue;
-      }
-      if (blockFile.getCanonicalPath().equals(listdir[j].getCanonicalPath())) {
-        continue;
+    if ((index + 1) < listdir.length) {
+      // Check if next index file is meta file
+      String metaFile = listdir[index + 1].getName();
+      if (metaFile.startsWith(blockName)) {
+        return Block.getGenerationStamp(metaFile);
       }
-      return Block.getGenerationStamp(listdir[j].getName());
     }
     FsDatasetImpl.LOG.warn("Block " + blockFile + " does not have a metafile!");
     return HdfsConstants.GRANDFATHER_GENERATION_STAMP;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/56893557/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
index a1804ae..a0fcb54 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
@@ -226,8 +226,9 @@ class FsVolumeList {
       throw exceptions.get(0);
     }
     long totalTimeTaken = Time.monotonicNow() - totalStartTime;
-    FsDatasetImpl.LOG.info("Total time to add all replicas to map: "
-        + totalTimeTaken + "ms");
+    FsDatasetImpl.LOG
+        .info("Total time to add all replicas to map for block pool " + bpid
+            + ": " + totalTimeTaken + "ms");
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/56893557/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java
index d4fb69b..c630b95 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java
@@ -136,6 +136,31 @@ class ReplicaMap {
   }
 
   /**
+   * Add a replica's meta information into the map, if already exist
+   * return the old replicaInfo.
+   */
+  ReplicaInfo addAndGet(String bpid, ReplicaInfo replicaInfo) {
+    checkBlockPool(bpid);
+    checkBlock(replicaInfo);
+    try (AutoCloseableLock l = lock.acquire()) {
+      FoldedTreeSet<ReplicaInfo> set = map.get(bpid);
+      if (set == null) {
+        // Add an entry for block pool if it does not exist already
+        set = new FoldedTreeSet<>();
+        map.put(bpid, set);
+      }
+      ReplicaInfo oldReplicaInfo = set.get(replicaInfo.getBlockId(),
+          LONG_AND_BLOCK_COMPARATOR);
+      if (oldReplicaInfo != null) {
+        return oldReplicaInfo;
+      } else {
+        set.add(replicaInfo);
+      }
+      return replicaInfo;
+    }
+  }
+
+  /**
    * Add all entries from the given replica map into the local replica map.
    */
   void addAll(ReplicaMap other) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/56893557/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 1573582..2ee8399 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -1282,6 +1282,15 @@
 </property>
 
 <property>
+  <name>dfs.datanode.volumes.replica-add.threadpool.size</name>
+  <value></value>
+  <description>Specifies the maximum number of threads to use for
+  adding block in volume. Default value for this configuration is
+  max of (volume * number of bp_service, number of processor).
+  </description>
+</property>
+
+<property>
   <name>dfs.image.compress</name>
   <value>false</value>
   <description>When this value is true, the dfs image will be compressed.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/56893557/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java
index f53c21c..581a7a8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java
@@ -18,11 +18,16 @@
 package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
 
 import com.google.common.base.Supplier;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.DF;
 import org.apache.hadoop.fs.FileSystemTestHelper;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
 import org.apache.hadoop.hdfs.server.datanode.BlockScanner;
 import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
@@ -30,6 +35,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.RoundRobinVolumeChoosingPolicy;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeChoosingPolicy;
 import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.util.AutoCloseableLock;
 import org.apache.hadoop.util.StringUtils;
 import org.junit.Before;
 import org.junit.Test;
@@ -40,12 +46,16 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeoutException;
 
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DU_RESERVED_PERCENTAGE_KEY;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -315,4 +325,56 @@ public class TestFsVolumeList {
         .build();
     assertEquals(600, volume4.getReserved());
   }
-}
+
+  @Test(timeout = 60000)
+  public void testAddRplicaProcessorForAddingReplicaInMap() throws Exception {
+    Configuration cnf = new Configuration();
+    int poolSize = 5;
+    cnf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);
+    cnf.setInt(
+        DFSConfigKeys.DFS_DATANODE_VOLUMES_REPLICA_ADD_THREADPOOL_SIZE_KEY,
+        poolSize);
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(cnf).numDataNodes(1)
+        .storagesPerDatanode(1).build();
+    DistributedFileSystem fs = cluster.getFileSystem();
+    // Generate data blocks.
+    ExecutorService pool = Executors.newFixedThreadPool(10);
+    List<Future<?>> futureList = new ArrayList<>();
+    for (int i = 0; i < 100; i++) {
+      Thread thread = new Thread() {
+        @Override
+        public void run() {
+          for (int j = 0; j < 10; j++) {
+            try {
+              DFSTestUtil.createFile(fs, new Path("File_" + getName() + j), 10,
+                  (short) 1, 0);
+            } catch (IllegalArgumentException | IOException e) {
+              e.printStackTrace();
+            }
+          }
+        }
+      };
+      thread.setName("FileWriter" + i);
+      futureList.add(pool.submit(thread));
+    }
+    // Wait for data generation
+    for (Future<?> f : futureList) {
+      f.get();
+    }
+    fs.close();
+    FsDatasetImpl fsDataset = (FsDatasetImpl) cluster.getDataNodes().get(0)
+        .getFSDataset();
+    ReplicaMap volumeMap = new ReplicaMap(new AutoCloseableLock());
+    RamDiskReplicaTracker ramDiskReplicaMap = RamDiskReplicaTracker
+        .getInstance(conf, fsDataset);
+    FsVolumeImpl vol = (FsVolumeImpl) fsDataset.getFsVolumeReferences().get(0);
+    String bpid = cluster.getNamesystem().getBlockPoolId();
+    // It will create BlockPoolSlice.AddReplicaProcessor task's and lunch in
+    // ForkJoinPool recursively
+    vol.getVolumeMap(bpid, volumeMap, ramDiskReplicaMap);
+    assertTrue("Failed to add all the replica to map", volumeMap.replicas(bpid)
+        .size() == 1000);
+    assertTrue("Fork pool size should be " + poolSize,
+        BlockPoolSlice.getAddReplicaForkPoolSize() == poolSize);
+  }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org