You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sz...@apache.org on 2016/02/23 00:02:07 UTC
hadoop git commit: HDFS-8578. On upgrade,
Datanode should process all storage/data dirs in parallel.
Contributed by vinayakumarb and szetszwo
Repository: hadoop
Updated Branches:
refs/heads/trunk a2fdfff02 -> 66289a3bf
HDFS-8578. On upgrade, Datanode should process all storage/data dirs in parallel. Contributed by vinayakumarb and szetszwo
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/66289a3b
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/66289a3b
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/66289a3b
Branch: refs/heads/trunk
Commit: 66289a3bf403f307844ea0b6ceed35b603d12c0b
Parents: a2fdfff
Author: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Authored: Mon Feb 22 15:01:15 2016 -0800
Committer: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Committed: Mon Feb 22 15:01:15 2016 -0800
----------------------------------------------------------------------
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 +
.../org/apache/hadoop/hdfs/DFSConfigKeys.java | 3 +
.../server/datanode/BlockPoolSliceStorage.java | 56 ++++---
.../hdfs/server/datanode/DataStorage.java | 146 +++++++++++++++----
4 files changed, 160 insertions(+), 48 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/66289a3b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index c39bcee..256df39 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -2840,6 +2840,9 @@ Release 2.7.3 - UNRELEASED
HDFS-4946. Allow preferLocalNode in BlockPlacementPolicyDefault to be
configurable (James Kinley and Nathan Roberts via kihwal)
+ HDFS-8578. On upgrade, Datanode should process all storage/data dirs in
+ parallel. (vinayakumarb and szetszwo via szetszwo)
+
OPTIMIZATIONS
BUG FIXES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/66289a3b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 39e4fe4..a055e2b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -836,6 +836,9 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
"dfs.datanode.slow.io.warning.threshold.ms";
public static final long DFS_DATANODE_SLOW_IO_WARNING_THRESHOLD_DEFAULT = 300;
+ // Number of parallel threads to load multiple datanode volumes
+ public static final String DFS_DATANODE_PARALLEL_VOLUME_LOAD_THREADS_NUM_KEY =
+ "dfs.datanode.parallel.volumes.load.threads.num";
public static final String DFS_DATANODE_BLOCK_ID_LAYOUT_UPGRADE_THREADS_KEY =
"dfs.datanode.block.id.layout.upgrade.threads";
public static final int DFS_DATANODE_BLOCK_ID_LAYOUT_UPGRADE_THREADS = 12;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/66289a3b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java
index acf10f1..90a4669 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java
@@ -27,6 +27,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.Set;
+import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -145,7 +146,8 @@ public class BlockPoolSliceStorage extends Storage {
* @throws IOException
*/
private StorageDirectory loadStorageDirectory(NamespaceInfo nsInfo,
- File dataDir, StartupOption startOpt, Configuration conf)
+ File dataDir, StartupOption startOpt,
+ List<Callable<StorageDirectory>> callables, Configuration conf)
throws IOException {
StorageDirectory sd = new StorageDirectory(dataDir, null, true);
try {
@@ -172,19 +174,17 @@ public class BlockPoolSliceStorage extends Storage {
// Each storage directory is treated individually.
// During startup some of them can upgrade or roll back
// while others could be up-to-date for the regular startup.
- if (doTransition(sd, nsInfo, startOpt, conf)) {
- return sd;
- }
+ if (!doTransition(sd, nsInfo, startOpt, callables, conf)) {
- if (getCTime() != nsInfo.getCTime()) {
- throw new IOException("Datanode CTime (=" + getCTime()
- + ") is not equal to namenode CTime (=" + nsInfo.getCTime() + ")");
+ // 3. Check CTime and update successfully loaded storage.
+ if (getCTime() != nsInfo.getCTime()) {
+ throw new IOException("Datanode CTime (=" + getCTime()
+ + ") is not equal to namenode CTime (=" + nsInfo.getCTime() + ")");
+ }
+ setServiceLayoutVersion(getServiceLayoutVersion());
+ writeProperties(sd);
}
- // 3. Update successfully loaded storage.
- setServiceLayoutVersion(getServiceLayoutVersion());
- writeProperties(sd);
-
return sd;
} catch (IOException ioe) {
sd.unlock();
@@ -208,7 +208,8 @@ public class BlockPoolSliceStorage extends Storage {
*/
List<StorageDirectory> loadBpStorageDirectories(NamespaceInfo nsInfo,
Collection<File> dataDirs, StartupOption startOpt,
- Configuration conf) throws IOException {
+ List<Callable<StorageDirectory>> callables, Configuration conf)
+ throws IOException {
List<StorageDirectory> succeedDirs = Lists.newArrayList();
try {
for (File dataDir : dataDirs) {
@@ -218,7 +219,7 @@ public class BlockPoolSliceStorage extends Storage {
"attempt to load an used block storage: " + dataDir);
}
final StorageDirectory sd = loadStorageDirectory(
- nsInfo, dataDir, startOpt, conf);
+ nsInfo, dataDir, startOpt, callables, conf);
succeedDirs.add(sd);
}
} catch (IOException e) {
@@ -242,11 +243,12 @@ public class BlockPoolSliceStorage extends Storage {
* @throws IOException on error
*/
List<StorageDirectory> recoverTransitionRead(NamespaceInfo nsInfo,
- Collection<File> dataDirs, StartupOption startOpt, Configuration conf)
+ Collection<File> dataDirs, StartupOption startOpt,
+ List<Callable<StorageDirectory>> callables, Configuration conf)
throws IOException {
LOG.info("Analyzing storage directories for bpid " + nsInfo.getBlockPoolID());
final List<StorageDirectory> loaded = loadBpStorageDirectories(
- nsInfo, dataDirs, startOpt, conf);
+ nsInfo, dataDirs, startOpt, callables, conf);
for (StorageDirectory sd : loaded) {
addStorageDir(sd);
}
@@ -353,7 +355,8 @@ public class BlockPoolSliceStorage extends Storage {
* @return true if the new properties has been written.
*/
private boolean doTransition(StorageDirectory sd, NamespaceInfo nsInfo,
- StartupOption startOpt, Configuration conf) throws IOException {
+ StartupOption startOpt, List<Callable<StorageDirectory>> callables,
+ Configuration conf) throws IOException {
if (startOpt == StartupOption.ROLLBACK && sd.getPreviousDir().exists()) {
Preconditions.checkState(!getTrashRootDir(sd).exists(),
sd.getPreviousDir() + " and " + getTrashRootDir(sd) + " should not " +
@@ -395,7 +398,7 @@ public class BlockPoolSliceStorage extends Storage {
}
if (this.layoutVersion > HdfsServerConstants.DATANODE_LAYOUT_VERSION
|| this.cTime < nsInfo.getCTime()) {
- doUpgrade(sd, nsInfo, conf); // upgrade
+ doUpgrade(sd, nsInfo, callables, conf); // upgrade
return true;
}
// layoutVersion == LAYOUT_VERSION && this.cTime > nsInfo.cTime
@@ -425,7 +428,9 @@ public class BlockPoolSliceStorage extends Storage {
* @throws IOException on error
*/
private void doUpgrade(final StorageDirectory bpSd,
- final NamespaceInfo nsInfo, final Configuration conf) throws IOException {
+ final NamespaceInfo nsInfo,
+ final List<Callable<StorageDirectory>> callables,
+ final Configuration conf) throws IOException {
// Upgrading is applicable only to release with federation or after
if (!DataNodeLayoutVersion.supports(
LayoutVersion.Feature.FEDERATION, layoutVersion)) {
@@ -463,10 +468,21 @@ public class BlockPoolSliceStorage extends Storage {
rename(bpCurDir, bpTmpDir);
final String name = "block pool " + blockpoolID + " at " + bpSd.getRoot();
- doUgrade(name, bpSd, nsInfo, bpPrevDir, bpTmpDir, bpCurDir, oldLV, conf);
+ if (callables == null) {
+ doUpgrade(name, bpSd, nsInfo, bpPrevDir, bpTmpDir, bpCurDir, oldLV, conf);
+ } else {
+ callables.add(new Callable<StorageDirectory>() {
+ @Override
+ public StorageDirectory call() throws Exception {
+ doUpgrade(name, bpSd, nsInfo, bpPrevDir, bpTmpDir, bpCurDir, oldLV,
+ conf);
+ return bpSd;
+ }
+ });
+ }
}
- private void doUgrade(String name, final StorageDirectory bpSd,
+ private void doUpgrade(String name, final StorageDirectory bpSd,
NamespaceInfo nsInfo, final File bpPrevDir, final File bpTmpDir,
final File bpCurDir, final int oldLV, Configuration conf)
throws IOException {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/66289a3b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
index 7903194..6697054 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
@@ -37,10 +37,12 @@ import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
@@ -51,6 +53,7 @@ import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.LayoutVersion;
@@ -260,8 +263,8 @@ public class DataStorage extends Storage {
}
private StorageDirectory loadStorageDirectory(DataNode datanode,
- NamespaceInfo nsInfo, File dataDir, StartupOption startOpt)
- throws IOException {
+ NamespaceInfo nsInfo, File dataDir, StartupOption startOpt,
+ List<Callable<StorageDirectory>> callables) throws IOException {
StorageDirectory sd = new StorageDirectory(dataDir, null, false);
try {
StorageState curState = sd.analyzeStorage(startOpt, this);
@@ -287,13 +290,12 @@ public class DataStorage extends Storage {
// Each storage directory is treated individually.
// During startup some of them can upgrade or roll back
// while others could be up-to-date for the regular startup.
- if (doTransition(sd, nsInfo, startOpt, datanode.getConf())) {
- return sd;
- }
+ if (!doTransition(sd, nsInfo, startOpt, callables, datanode.getConf())) {
- // 3. Update successfully loaded storage.
- setServiceLayoutVersion(getServiceLayoutVersion());
- writeProperties(sd);
+ // 3. Update successfully loaded storage.
+ setServiceLayoutVersion(getServiceLayoutVersion());
+ writeProperties(sd);
+ }
return sd;
} catch (IOException ioe) {
@@ -325,7 +327,7 @@ public class DataStorage extends Storage {
}
StorageDirectory sd = loadStorageDirectory(
- datanode, nsInfos.get(0), volume, StartupOption.HOTSWAP);
+ datanode, nsInfos.get(0), volume, StartupOption.HOTSWAP, null);
VolumeBuilder builder =
new VolumeBuilder(this, sd);
for (NamespaceInfo nsInfo : nsInfos) {
@@ -336,12 +338,35 @@ public class DataStorage extends Storage {
final BlockPoolSliceStorage bpStorage = getBlockPoolSliceStorage(nsInfo);
final List<StorageDirectory> dirs = bpStorage.loadBpStorageDirectories(
- nsInfo, bpDataDirs, StartupOption.HOTSWAP, datanode.getConf());
+ nsInfo, bpDataDirs, StartupOption.HOTSWAP, null, datanode.getConf());
builder.addBpStorageDirectories(nsInfo.getBlockPoolID(), dirs);
}
return builder;
}
+ static int getParallelVolumeLoadThreadsNum(int dataDirs, Configuration conf) {
+ final String key
+ = DFSConfigKeys.DFS_DATANODE_PARALLEL_VOLUME_LOAD_THREADS_NUM_KEY;
+ final int n = conf.getInt(key, dataDirs);
+ if (n < 1) {
+ throw new HadoopIllegalArgumentException(key + " = " + n + " < 1");
+ }
+ final int min = Math.min(n, dataDirs);
+ LOG.info("Using " + min + " threads to upgrade data directories ("
+ + key + "=" + n + ", dataDirs=" + dataDirs + ")");
+ return min;
+ }
+
+ static class UpgradeTask {
+ private final StorageLocation dataDir;
+ private final Future<StorageDirectory> future;
+
+ UpgradeTask(StorageLocation dataDir, Future<StorageDirectory> future) {
+ this.dataDir = dataDir;
+ this.future = future;
+ }
+ }
+
/**
* Add a list of volumes to be managed by DataStorage. If the volume is empty,
* format it, otherwise recover it from previous transitions if required.
@@ -356,32 +381,62 @@ public class DataStorage extends Storage {
synchronized List<StorageDirectory> addStorageLocations(DataNode datanode,
NamespaceInfo nsInfo, Collection<StorageLocation> dataDirs,
StartupOption startOpt) throws IOException {
- final List<StorageLocation> successLocations = loadDataStorage(
- datanode, nsInfo, dataDirs, startOpt);
- return loadBlockPoolSliceStorage(
- datanode, nsInfo, successLocations, startOpt);
+ final int numThreads = getParallelVolumeLoadThreadsNum(
+ dataDirs.size(), datanode.getConf());
+ final ExecutorService executor = Executors.newFixedThreadPool(numThreads);
+ try {
+ final List<StorageLocation> successLocations = loadDataStorage(
+ datanode, nsInfo, dataDirs, startOpt, executor);
+ return loadBlockPoolSliceStorage(
+ datanode, nsInfo, successLocations, startOpt, executor);
+ } finally {
+ executor.shutdown();
+ }
}
private List<StorageLocation> loadDataStorage(DataNode datanode,
NamespaceInfo nsInfo, Collection<StorageLocation> dataDirs,
- StartupOption startOpt) throws IOException {
+ StartupOption startOpt, ExecutorService executor) throws IOException {
final List<StorageLocation> success = Lists.newArrayList();
+ final List<UpgradeTask> tasks = Lists.newArrayList();
for (StorageLocation dataDir : dataDirs) {
File root = dataDir.getFile();
if (!containsStorageDir(root)) {
try {
// It first ensures the datanode level format is completed.
+ final List<Callable<StorageDirectory>> callables
+ = Lists.newArrayList();
final StorageDirectory sd = loadStorageDirectory(
- datanode, nsInfo, root, startOpt);
- addStorageDir(sd);
+ datanode, nsInfo, root, startOpt, callables);
+ if (callables.isEmpty()) {
+ addStorageDir(sd);
+ success.add(dataDir);
+ } else {
+ for(Callable<StorageDirectory> c : callables) {
+ tasks.add(new UpgradeTask(dataDir, executor.submit(c)));
+ }
+ }
} catch (IOException e) {
LOG.warn("Failed to add storage directory " + dataDir, e);
- continue;
}
} else {
LOG.info("Storage directory " + dataDir + " has already been used.");
+ success.add(dataDir);
+ }
+ }
+
+ if (!tasks.isEmpty()) {
+ LOG.info("loadDataStorage: " + tasks.size() + " upgrade tasks");
+ for(UpgradeTask t : tasks) {
+ try {
+ addStorageDir(t.future.get());
+ success.add(t.dataDir);
+ } catch (ExecutionException e) {
+ LOG.warn("Failed to upgrade storage directory " + t.dataDir, e);
+ } catch (InterruptedException e) {
+ throw DFSUtilClient.toInterruptedIOException("Task interrupted", e);
+ }
}
- success.add(dataDir);
}
return success;
@@ -389,10 +444,11 @@ public class DataStorage extends Storage {
private List<StorageDirectory> loadBlockPoolSliceStorage(DataNode datanode,
NamespaceInfo nsInfo, Collection<StorageLocation> dataDirs,
- StartupOption startOpt) throws IOException {
+ StartupOption startOpt, ExecutorService executor) throws IOException {
final String bpid = nsInfo.getBlockPoolID();
final BlockPoolSliceStorage bpStorage = getBlockPoolSliceStorage(nsInfo);
final List<StorageDirectory> success = Lists.newArrayList();
+ final List<UpgradeTask> tasks = Lists.newArrayList();
for (StorageLocation dataDir : dataDirs) {
final File curDir = new File(dataDir.getFile(), STORAGE_DIR_CURRENT);
List<File> bpDataDirs = new ArrayList<File>();
@@ -400,10 +456,17 @@ public class DataStorage extends Storage {
try {
makeBlockPoolDataDir(bpDataDirs, null);
+ final List<Callable<StorageDirectory>> callables = Lists.newArrayList();
final List<StorageDirectory> dirs = bpStorage.recoverTransitionRead(
- nsInfo, bpDataDirs, startOpt, datanode.getConf());
- for(StorageDirectory sd : dirs) {
- success.add(sd);
+ nsInfo, bpDataDirs, startOpt, callables, datanode.getConf());
+ if (callables.isEmpty()) {
+ for(StorageDirectory sd : dirs) {
+ success.add(sd);
+ }
+ } else {
+ for(Callable<StorageDirectory> c : callables) {
+ tasks.add(new UpgradeTask(dataDir, executor.submit(c)));
+ }
}
} catch (IOException e) {
LOG.warn("Failed to add storage directory " + dataDir
@@ -411,6 +474,20 @@ public class DataStorage extends Storage {
}
}
+ if (!tasks.isEmpty()) {
+ LOG.info("loadBlockPoolSliceStorage: " + tasks.size() + " upgrade tasks");
+ for(UpgradeTask t : tasks) {
+ try {
+ success.add(t.future.get());
+ } catch (ExecutionException e) {
+ LOG.warn("Failed to upgrade storage directory " + t.dataDir
+ + " for block pool " + bpid, e);
+ } catch (InterruptedException e) {
+ throw DFSUtilClient.toInterruptedIOException("Task interrupted", e);
+ }
+ }
+ }
+
return success;
}
@@ -655,7 +732,8 @@ public class DataStorage extends Storage {
* @return true if the new properties has been written.
*/
private boolean doTransition(StorageDirectory sd, NamespaceInfo nsInfo,
- StartupOption startOpt, Configuration conf) throws IOException {
+ StartupOption startOpt, List<Callable<StorageDirectory>> callables,
+ Configuration conf) throws IOException {
if (startOpt == StartupOption.ROLLBACK) {
doRollback(sd, nsInfo); // rollback if applicable
}
@@ -697,7 +775,7 @@ public class DataStorage extends Storage {
// simply update the properties.
upgradeProperties(sd);
} else {
- doUpgradePreFederation(sd, nsInfo, conf);
+ doUpgradePreFederation(sd, nsInfo, callables, conf);
}
return true; // doUgrade already has written properties
}
@@ -734,7 +812,9 @@ public class DataStorage extends Storage {
* @param sd storage directory
*/
void doUpgradePreFederation(final StorageDirectory sd,
- final NamespaceInfo nsInfo, final Configuration conf) throws IOException {
+ final NamespaceInfo nsInfo,
+ final List<Callable<StorageDirectory>> callables,
+ final Configuration conf) throws IOException {
final int oldLV = getLayoutVersion();
LOG.info("Upgrading storage directory " + sd.getRoot()
+ ".\n old LV = " + oldLV
@@ -767,10 +847,20 @@ public class DataStorage extends Storage {
bpStorage.format(curDir, nsInfo);
final File toDir = new File(curBpDir, STORAGE_DIR_CURRENT);
- doUgrade(sd, nsInfo, prevDir, tmpDir, bbwDir, toDir, oldLV, conf);
+ if (callables == null) {
+ doUpgrade(sd, nsInfo, prevDir, tmpDir, bbwDir, toDir, oldLV, conf);
+ } else {
+ callables.add(new Callable<StorageDirectory>() {
+ @Override
+ public StorageDirectory call() throws Exception {
+ doUpgrade(sd, nsInfo, prevDir, tmpDir, bbwDir, toDir, oldLV, conf);
+ return sd;
+ }
+ });
+ }
}
- private void doUgrade(final StorageDirectory sd,
+ private void doUpgrade(final StorageDirectory sd,
final NamespaceInfo nsInfo, final File prevDir,
final File tmpDir, final File bbwDir, final File toDir, final int oldLV,
Configuration conf) throws IOException {