You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by in...@apache.org on 2018/10/02 21:40:41 UTC
[41/50] [abbrv] hadoop git commit: HDFS-13768. Adding replicas to
volume map makes DataNode start slowly. Contributed by Surendra Singh
Lilhore.
HDFS-13768. Adding replicas to volume map makes DataNode start slowly. Contributed by Surendra Singh Lilhore.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/56893557
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/56893557
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/56893557
Branch: refs/heads/HDFS-13532
Commit: 5689355783de005ebc604f4403dc5129a286bfca
Parents: f6c5ef9
Author: Yiqun Lin <yq...@apache.org>
Authored: Tue Oct 2 09:43:14 2018 +0800
Committer: Yiqun Lin <yq...@apache.org>
Committed: Tue Oct 2 09:43:14 2018 +0800
----------------------------------------------------------------------
.../org/apache/hadoop/hdfs/DFSConfigKeys.java | 3 +
.../hadoop/hdfs/server/datanode/DataNode.java | 2 +-
.../datanode/fsdataset/impl/BlockPoolSlice.java | 177 +++++++++++++++++--
.../datanode/fsdataset/impl/FsDatasetImpl.java | 14 ++
.../datanode/fsdataset/impl/FsDatasetUtil.java | 30 ++--
.../datanode/fsdataset/impl/FsVolumeList.java | 5 +-
.../datanode/fsdataset/impl/ReplicaMap.java | 25 +++
.../src/main/resources/hdfs-default.xml | 9 +
.../fsdataset/impl/TestFsVolumeList.java | 64 ++++++-
9 files changed, 300 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/56893557/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index a7e7b9b..d8024dc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -365,6 +365,9 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final long DFS_CONTENT_SUMMARY_SLEEP_MICROSEC_DEFAULT = 500;
public static final String DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY = "dfs.datanode.failed.volumes.tolerated";
public static final int DFS_DATANODE_FAILED_VOLUMES_TOLERATED_DEFAULT = 0;
+ public static final String
+ DFS_DATANODE_VOLUMES_REPLICA_ADD_THREADPOOL_SIZE_KEY =
+ "dfs.datanode.volumes.replica-add.threadpool.size";
public static final String DFS_DATANODE_SYNCONCLOSE_KEY = "dfs.datanode.synconclose";
public static final boolean DFS_DATANODE_SYNCONCLOSE_DEFAULT = false;
public static final String DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_KEY = "dfs.datanode.socket.reuse.keepalive";
http://git-wip-us.apache.org/repos/asf/hadoop/blob/56893557/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index c980395..270e30b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -1695,7 +1695,7 @@ public class DataNode extends ReconfigurableBase
return blockPoolManager.get(bpid);
}
- int getBpOsCount() {
+ public int getBpOsCount() {
return blockPoolManager.getAllNamenodeThreads().size();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/56893557/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
index 2adfb6b..b9b581f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
@@ -28,8 +28,19 @@ import java.io.InputStream;
import java.io.OutputStreamWriter;
import java.io.RandomAccessFile;
import java.io.Writer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
import java.util.Iterator;
+import java.util.List;
+import java.util.Queue;
import java.util.Scanner;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.ForkJoinTask;
+import java.util.concurrent.RecursiveAction;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
@@ -52,8 +63,8 @@ import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
import org.apache.hadoop.hdfs.server.datanode.ReplicaBuilder;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.RamDiskReplicaTracker.RamDiskReplica;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
-
import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.MultipleIOException;
import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.DiskChecker;
@@ -96,6 +107,17 @@ class BlockPoolSlice {
private final int maxDataLength;
private final FileIoProvider fileIoProvider;
+ private static ForkJoinPool addReplicaThreadPool = null;
+ private static final int VOLUMES_REPLICA_ADD_THREADPOOL_SIZE = Runtime
+ .getRuntime().availableProcessors();
+ private static final Comparator<File> FILE_COMPARATOR =
+ new Comparator<File>() {
+ @Override
+ public int compare(File f1, File f2) {
+ return f1.getName().compareTo(f2.getName());
+ }
+ };
+
// TODO:FEDERATION scalability issue - a thread per DU is needed
private final GetSpaceUsed dfsUsage;
@@ -161,13 +183,15 @@ class BlockPoolSlice {
.setConf(conf)
.setInitialUsed(loadDfsUsed())
.build();
-
+ // initialize add replica fork join pool
+ initializeAddReplicaPool(conf);
// Make the dfs usage to be saved during shutdown.
shutdownHook = new Runnable() {
@Override
public void run() {
if (!dfsUsedSaved) {
saveDfsUsed();
+ addReplicaThreadPool.shutdownNow();
}
}
};
@@ -175,6 +199,21 @@ class BlockPoolSlice {
SHUTDOWN_HOOK_PRIORITY);
}
+ private synchronized void initializeAddReplicaPool(Configuration conf) {
+ if (addReplicaThreadPool == null) {
+ FsDatasetImpl dataset = (FsDatasetImpl) volume.getDataset();
+ int numberOfBlockPoolSlice = dataset.getVolumeCount()
+ * dataset.getBPServiceCount();
+ int poolsize = Math.max(numberOfBlockPoolSlice,
+ VOLUMES_REPLICA_ADD_THREADPOOL_SIZE);
+ // Default pool sizes is max of (volume * number of bp_service) and
+ // number of processor.
+ addReplicaThreadPool = new ForkJoinPool(conf.getInt(
+ DFSConfigKeys.DFS_DATANODE_VOLUMES_REPLICA_ADD_THREADPOOL_SIZE_KEY,
+ poolsize));
+ }
+ }
+
File getDirectory() {
return currentDir.getParentFile();
}
@@ -374,10 +413,55 @@ class BlockPoolSlice {
boolean success = readReplicasFromCache(volumeMap, lazyWriteReplicaMap);
if (!success) {
+ List<IOException> exceptions = Collections
+ .synchronizedList(new ArrayList<IOException>());
+ Queue<RecursiveAction> subTaskQueue =
+ new ConcurrentLinkedQueue<RecursiveAction>();
+
// add finalized replicas
- addToReplicasMap(volumeMap, finalizedDir, lazyWriteReplicaMap, true);
+ AddReplicaProcessor task = new AddReplicaProcessor(volumeMap,
+ finalizedDir, lazyWriteReplicaMap, true, exceptions, subTaskQueue);
+ ForkJoinTask<Void> finalizedTask = addReplicaThreadPool.submit(task);
+
// add rbw replicas
- addToReplicasMap(volumeMap, rbwDir, lazyWriteReplicaMap, false);
+ task = new AddReplicaProcessor(volumeMap, rbwDir, lazyWriteReplicaMap,
+ false, exceptions, subTaskQueue);
+ ForkJoinTask<Void> rbwTask = addReplicaThreadPool.submit(task);
+
+ try {
+ finalizedTask.get();
+ rbwTask.get();
+ } catch (InterruptedException | ExecutionException e) {
+ exceptions.add(new IOException(
+ "Failed to start sub tasks to add replica in replica map :"
+ + e.getMessage()));
+ }
+
+ //wait for all the tasks to finish.
+ waitForSubTaskToFinish(subTaskQueue, exceptions);
+ }
+ }
+
+ /**
+ * Wait till all the recursive task for add replica to volume completed.
+ *
+ * @param subTaskQueue
+ * {@link AddReplicaProcessor} tasks list.
+ * @param exceptions
+ * exceptions occurred in sub tasks.
+ * @throws IOException
+ * throw if any sub task or multiple sub tasks failed.
+ */
+ private void waitForSubTaskToFinish(Queue<RecursiveAction> subTaskQueue,
+ List<IOException> exceptions) throws IOException {
+ while (!subTaskQueue.isEmpty()) {
+ RecursiveAction task = subTaskQueue.poll();
+ if (task != null) {
+ task.join();
+ }
+ }
+ if (!exceptions.isEmpty()) {
+ throw MultipleIOException.createIOException(exceptions);
}
}
@@ -526,10 +610,10 @@ class BlockPoolSlice {
}
}
- ReplicaInfo oldReplica = volumeMap.get(bpid, newReplica.getBlockId());
- if (oldReplica == null) {
- volumeMap.add(bpid, newReplica);
- } else {
+ ReplicaInfo tmpReplicaInfo = volumeMap.addAndGet(bpid, newReplica);
+ ReplicaInfo oldReplica = (tmpReplicaInfo == newReplica) ? null
+ : tmpReplicaInfo;
+ if (oldReplica != null) {
// We have multiple replicas of the same block so decide which one
// to keep.
newReplica = resolveDuplicateReplicas(newReplica, oldReplica, volumeMap);
@@ -558,15 +642,23 @@ class BlockPoolSlice {
* storage.
* @param isFinalized true if the directory has finalized replicas;
* false if the directory has rbw replicas
+ * @param exceptions list of exception which need to return to parent thread.
+ * @param subTaskQueue queue of sub tasks
*/
void addToReplicasMap(ReplicaMap volumeMap, File dir,
- final RamDiskReplicaTracker lazyWriteReplicaMap,
- boolean isFinalized)
+ final RamDiskReplicaTracker lazyWriteReplicaMap, boolean isFinalized,
+ List<IOException> exceptions, Queue<RecursiveAction> subTaskQueue)
throws IOException {
File[] files = fileIoProvider.listFiles(volume, dir);
- for (File file : files) {
+ Arrays.sort(files, FILE_COMPARATOR);
+ for (int i = 0; i < files.length; i++) {
+ File file = files[i];
if (file.isDirectory()) {
- addToReplicasMap(volumeMap, file, lazyWriteReplicaMap, isFinalized);
+ // Launch new sub task.
+ AddReplicaProcessor subTask = new AddReplicaProcessor(volumeMap, file,
+ lazyWriteReplicaMap, isFinalized, exceptions, subTaskQueue);
+ subTask.fork();
+ subTaskQueue.add(subTask);
}
if (isFinalized && FsDatasetUtil.isUnlinkTmpFile(file)) {
@@ -581,7 +673,7 @@ class BlockPoolSlice {
}
long genStamp = FsDatasetUtil.getGenerationStampFromFile(
- files, file);
+ files, file, i);
long blockId = Block.filename2id(file.getName());
Block block = new Block(blockId, file.length(), genStamp);
addReplicaToReplicasMap(block, volumeMap, lazyWriteReplicaMap,
@@ -886,4 +978,63 @@ class BlockPoolSlice {
public long getNumOfBlocks() {
return numOfBlocks.get();
}
+
+ /**
+ * Recursive action for add replica in map.
+ */
+ class AddReplicaProcessor extends RecursiveAction {
+
+ private ReplicaMap volumeMap;
+ private File dir;
+ private RamDiskReplicaTracker lazyWriteReplicaMap;
+ private boolean isFinalized;
+ private List<IOException> exceptions;
+ private Queue<RecursiveAction> subTaskQueue;
+
+ /**
+ * @param volumeMap
+ * the replicas map
+ * @param dir
+ * an input directory
+ * @param lazyWriteReplicaMap
+ * Map of replicas on transient storage.
+ * @param isFinalized
+ * true if the directory has finalized replicas; false if the
+ * directory has rbw replicas
+ * @param exceptions
+ * List of exception which need to return to parent thread.
+ * @param subTaskQueue
+ * queue of sub tasks
+ */
+ AddReplicaProcessor(ReplicaMap volumeMap, File dir,
+ RamDiskReplicaTracker lazyWriteReplicaMap, boolean isFinalized,
+ List<IOException> exceptions, Queue<RecursiveAction> subTaskQueue) {
+ this.volumeMap = volumeMap;
+ this.dir = dir;
+ this.lazyWriteReplicaMap = lazyWriteReplicaMap;
+ this.isFinalized = isFinalized;
+ this.exceptions = exceptions;
+ this.subTaskQueue = subTaskQueue;
+ }
+
+ @Override
+ protected void compute() {
+ try {
+ addToReplicasMap(volumeMap, dir, lazyWriteReplicaMap, isFinalized,
+ exceptions, subTaskQueue);
+ } catch (IOException e) {
+ LOG.warn("Caught exception while adding replicas from " + volume
+ + " in subtask. Will throw later.", e);
+ exceptions.add(e);
+ }
+ }
+ }
+
+ /**
+ * Return the size of fork pool used for adding replica in map.
+ */
+ @VisibleForTesting
+ public static int getAddReplicaForkPoolSize() {
+ return addReplicaThreadPool.getPoolSize();
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/56893557/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index 27196c2..027a0bf 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -3307,6 +3307,20 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
this.timer = newTimer;
}
+ /**
+ * Return the number of BP service count.
+ */
+ public int getBPServiceCount() {
+ return datanode.getBpOsCount();
+ }
+
+ /**
+ * Return the number of volume.
+ */
+ public int getVolumeCount() {
+ return volumes.getVolumes().size();
+ }
+
void stopAllDataxceiverThreads(FsVolumeImpl volume) {
try (AutoCloseableLock lock = datasetLock.acquire()) {
for (String blockPoolId : volumeMap.getBlockPoolList()) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/56893557/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java
index 9f115a0..8a3b237 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetUtil.java
@@ -117,21 +117,27 @@ public class FsDatasetUtil {
}
/**
- * Find the meta-file for the specified block file
- * and then return the generation stamp from the name of the meta-file.
+ * Find the meta-file for the specified block file and then return the
+ * generation stamp from the name of the meta-file. Generally meta file will
+ * be the next file in sorted array of file's.
+ *
+ * @param listdir
+ * sorted list of file based on name.
+ * @param blockFile
+ * block file for which generation stamp is needed.
+ * @param index
+ * index of block file in array.
+ * @return generation stamp for block file.
*/
- static long getGenerationStampFromFile(File[] listdir, File blockFile)
- throws IOException {
+ static long getGenerationStampFromFile(File[] listdir, File blockFile,
+ int index) {
String blockName = blockFile.getName();
- for (int j = 0; j < listdir.length; j++) {
- String path = listdir[j].getName();
- if (!path.startsWith(blockName)) {
- continue;
- }
- if (blockFile.getCanonicalPath().equals(listdir[j].getCanonicalPath())) {
- continue;
+ if ((index + 1) < listdir.length) {
+ // Check if next index file is meta file
+ String metaFile = listdir[index + 1].getName();
+ if (metaFile.startsWith(blockName)) {
+ return Block.getGenerationStamp(metaFile);
}
- return Block.getGenerationStamp(listdir[j].getName());
}
FsDatasetImpl.LOG.warn("Block " + blockFile + " does not have a metafile!");
return HdfsConstants.GRANDFATHER_GENERATION_STAMP;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/56893557/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
index a1804ae..a0fcb54 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
@@ -226,8 +226,9 @@ class FsVolumeList {
throw exceptions.get(0);
}
long totalTimeTaken = Time.monotonicNow() - totalStartTime;
- FsDatasetImpl.LOG.info("Total time to add all replicas to map: "
- + totalTimeTaken + "ms");
+ FsDatasetImpl.LOG
+ .info("Total time to add all replicas to map for block pool " + bpid
+ + ": " + totalTimeTaken + "ms");
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/56893557/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java
index d4fb69b..c630b95 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java
@@ -136,6 +136,31 @@ class ReplicaMap {
}
/**
+ * Add a replica's meta information into the map, if already exist
+ * return the old replicaInfo.
+ */
+ ReplicaInfo addAndGet(String bpid, ReplicaInfo replicaInfo) {
+ checkBlockPool(bpid);
+ checkBlock(replicaInfo);
+ try (AutoCloseableLock l = lock.acquire()) {
+ FoldedTreeSet<ReplicaInfo> set = map.get(bpid);
+ if (set == null) {
+ // Add an entry for block pool if it does not exist already
+ set = new FoldedTreeSet<>();
+ map.put(bpid, set);
+ }
+ ReplicaInfo oldReplicaInfo = set.get(replicaInfo.getBlockId(),
+ LONG_AND_BLOCK_COMPARATOR);
+ if (oldReplicaInfo != null) {
+ return oldReplicaInfo;
+ } else {
+ set.add(replicaInfo);
+ }
+ return replicaInfo;
+ }
+ }
+
+ /**
* Add all entries from the given replica map into the local replica map.
*/
void addAll(ReplicaMap other) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/56893557/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 1573582..2ee8399 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -1282,6 +1282,15 @@
</property>
<property>
+ <name>dfs.datanode.volumes.replica-add.threadpool.size</name>
+ <value></value>
+ <description>Specifies the maximum number of threads to use for
+ adding block in volume. Default value for this configuration is
+ max of (volume * number of bp_service, number of processor).
+ </description>
+</property>
+
+<property>
<name>dfs.image.compress</name>
<value>false</value>
<description>When this value is true, the dfs image will be compressed.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/56893557/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java
index f53c21c..581a7a8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java
@@ -18,11 +18,16 @@
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import com.google.common.base.Supplier;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.DF;
import org.apache.hadoop.fs.FileSystemTestHelper;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.server.datanode.BlockScanner;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
@@ -30,6 +35,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.RoundRobinVolumeChoosingPolicy;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeChoosingPolicy;
import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.util.StringUtils;
import org.junit.Before;
import org.junit.Test;
@@ -40,12 +46,16 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DU_RESERVED_PERCENTAGE_KEY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -315,4 +325,56 @@ public class TestFsVolumeList {
.build();
assertEquals(600, volume4.getReserved());
}
-}
+
+ @Test(timeout = 60000)
+ public void testAddRplicaProcessorForAddingReplicaInMap() throws Exception {
+ Configuration cnf = new Configuration();
+ int poolSize = 5;
+ cnf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);
+ cnf.setInt(
+ DFSConfigKeys.DFS_DATANODE_VOLUMES_REPLICA_ADD_THREADPOOL_SIZE_KEY,
+ poolSize);
+ MiniDFSCluster cluster = new MiniDFSCluster.Builder(cnf).numDataNodes(1)
+ .storagesPerDatanode(1).build();
+ DistributedFileSystem fs = cluster.getFileSystem();
+ // Generate data blocks.
+ ExecutorService pool = Executors.newFixedThreadPool(10);
+ List<Future<?>> futureList = new ArrayList<>();
+ for (int i = 0; i < 100; i++) {
+ Thread thread = new Thread() {
+ @Override
+ public void run() {
+ for (int j = 0; j < 10; j++) {
+ try {
+ DFSTestUtil.createFile(fs, new Path("File_" + getName() + j), 10,
+ (short) 1, 0);
+ } catch (IllegalArgumentException | IOException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ };
+ thread.setName("FileWriter" + i);
+ futureList.add(pool.submit(thread));
+ }
+ // Wait for data generation
+ for (Future<?> f : futureList) {
+ f.get();
+ }
+ fs.close();
+ FsDatasetImpl fsDataset = (FsDatasetImpl) cluster.getDataNodes().get(0)
+ .getFSDataset();
+ ReplicaMap volumeMap = new ReplicaMap(new AutoCloseableLock());
+ RamDiskReplicaTracker ramDiskReplicaMap = RamDiskReplicaTracker
+ .getInstance(conf, fsDataset);
+ FsVolumeImpl vol = (FsVolumeImpl) fsDataset.getFsVolumeReferences().get(0);
+ String bpid = cluster.getNamesystem().getBlockPoolId();
+ // It will create BlockPoolSlice.AddReplicaProcessor task's and lunch in
+ // ForkJoinPool recursively
+ vol.getVolumeMap(bpid, volumeMap, ramDiskReplicaMap);
+ assertTrue("Failed to add all the replica to map", volumeMap.replicas(bpid)
+ .size() == 1000);
+ assertTrue("Fork pool size should be " + poolSize,
+ BlockPoolSlice.getAddReplicaForkPoolSize() == poolSize);
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org