You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ar...@apache.org on 2016/01/16 01:11:04 UTC
[38/43] hadoop git commit: HDFS-9624. DataNode start slowly due to
the initial DU command operations. (Lin Yiqun via wang)
HDFS-9624. DataNode start slowly due to the initial DU command operations. (Lin Yiqun via wang)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c07f7fa8
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c07f7fa8
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c07f7fa8
Branch: refs/heads/HDFS-1312
Commit: c07f7fa8ff752436726239d938e0461236839acf
Parents: a9c69eb
Author: Andrew Wang <wa...@apache.org>
Authored: Fri Jan 15 11:28:46 2016 -0800
Committer: Andrew Wang <wa...@apache.org>
Committed: Fri Jan 15 11:28:46 2016 -0800
----------------------------------------------------------------------
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 +
.../org/apache/hadoop/hdfs/DFSConfigKeys.java | 5 +
.../datanode/fsdataset/impl/BlockPoolSlice.java | 34 ++++---
.../datanode/fsdataset/impl/FsDatasetImpl.java | 9 +-
.../datanode/fsdataset/impl/FsVolumeImpl.java | 13 ++-
.../src/main/resources/hdfs-default.xml | 13 +++
.../fsdataset/impl/TestFsDatasetImpl.java | 97 ++++++++++++++++++++
7 files changed, 161 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c07f7fa8/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 999c6c8..94f55b8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -933,6 +933,9 @@ Release 2.9.0 - UNRELEASED
HDFS-9350. Avoid creating temprorary strings in Block.toString() and
getBlockName() (Staffan Friberg via cmccabe)
+ HDFS-9624. DataNode start slowly due to the initial DU command operations.
+ (Lin Yiqun via wang)
+
OPTIMIZATIONS
BUG FIXES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c07f7fa8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 22859ee..9dd251f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -125,6 +125,11 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_DATANODE_DUPLICATE_REPLICA_DELETION = "dfs.datanode.duplicate.replica.deletion";
public static final boolean DFS_DATANODE_DUPLICATE_REPLICA_DELETION_DEFAULT = true;
+ public static final String DFS_DN_CACHED_DFSUSED_CHECK_INTERVAL_MS =
+ "dfs.datanode.cached-dfsused.check.interval.ms";
+ public static final long DFS_DN_CACHED_DFSUSED_CHECK_INTERVAL_DEFAULT_MS =
+ 600000;
+
public static final String DFS_NAMENODE_PATH_BASED_CACHE_BLOCK_MAP_ALLOCATION_PERCENT =
"dfs.namenode.path.based.cache.block.map.allocation.percent";
public static final float DFS_NAMENODE_PATH_BASED_CACHE_BLOCK_MAP_ALLOCATION_PERCENT_DEFAULT = 0.25f;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c07f7fa8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
index 571f085..188ab68 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
@@ -55,7 +55,7 @@ import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.DiskChecker;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import org.apache.hadoop.util.ShutdownHookManager;
-import org.apache.hadoop.util.Time;
+import org.apache.hadoop.util.Timer;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.io.Files;
@@ -79,12 +79,15 @@ class BlockPoolSlice {
private final File rbwDir; // directory store RBW replica
private final File tmpDir; // directory store Temporary replica
private final int ioFileBufferSize;
- private static final String DU_CACHE_FILE = "dfsUsed";
+ @VisibleForTesting
+ public static final String DU_CACHE_FILE = "dfsUsed";
private volatile boolean dfsUsedSaved = false;
private static final int SHUTDOWN_HOOK_PRIORITY = 30;
private final boolean deleteDuplicateReplicas;
private static final String REPLICA_CACHE_FILE = "replicas";
private final long replicaCacheExpiry = 5*60*1000;
+ private final long cachedDfsUsedCheckTime;
+ private final Timer timer;
// TODO:FEDERATION scalability issue - a thread per DU is needed
private final DU dfsUsage;
@@ -95,10 +98,11 @@ class BlockPoolSlice {
* @param volume {@link FsVolumeImpl} to which this BlockPool belongs to
* @param bpDir directory corresponding to the BlockPool
* @param conf configuration
+ * @param timer include methods for getting time
* @throws IOException
*/
BlockPoolSlice(String bpid, FsVolumeImpl volume, File bpDir,
- Configuration conf) throws IOException {
+ Configuration conf, Timer timer) throws IOException {
this.bpid = bpid;
this.volume = volume;
this.currentDir = new File(bpDir, DataStorage.STORAGE_DIR_CURRENT);
@@ -117,6 +121,12 @@ class BlockPoolSlice {
DFSConfigKeys.DFS_DATANODE_DUPLICATE_REPLICA_DELETION,
DFSConfigKeys.DFS_DATANODE_DUPLICATE_REPLICA_DELETION_DEFAULT);
+ this.cachedDfsUsedCheckTime =
+ conf.getLong(
+ DFSConfigKeys.DFS_DN_CACHED_DFSUSED_CHECK_INTERVAL_MS,
+ DFSConfigKeys.DFS_DN_CACHED_DFSUSED_CHECK_INTERVAL_DEFAULT_MS);
+ this.timer = timer;
+
// Files that were being written when the datanode was last shutdown
// are now moved back to the data directory. It is possible that
// in the future, we might want to do some sort of datanode-local
@@ -187,11 +197,13 @@ class BlockPoolSlice {
dfsUsage.incDfsUsed(value);
}
- /**
- * Read in the cached DU value and return it if it is less than 600 seconds
- * old (DU update interval). Slight imprecision of dfsUsed is not critical
- * and skipping DU can significantly shorten the startup time.
- * If the cached value is not available or too old, -1 is returned.
+ /**
+ * Read in the cached DU value and return it if it is less than
+ * cachedDfsUsedCheckTime which is set by
+ * dfs.datanode.cached-dfsused.check.interval.ms parameter. Slight imprecision
+ * of dfsUsed is not critical and skipping DU can significantly shorten the
+ * startup time. If the cached value is not available or too old, -1 is
+ * returned.
*/
long loadDfsUsed() {
long cachedDfsUsed;
@@ -219,7 +231,7 @@ class BlockPoolSlice {
}
// Return the cached value if mtime is okay.
- if (mtime > 0 && (Time.now() - mtime < 600000L)) {
+ if (mtime > 0 && (timer.now() - mtime < cachedDfsUsedCheckTime)) {
FsDatasetImpl.LOG.info("Cached dfsUsed found for " + currentDir + ": " +
cachedDfsUsed);
return cachedDfsUsed;
@@ -245,7 +257,7 @@ class BlockPoolSlice {
try (Writer out = new OutputStreamWriter(
new FileOutputStream(outFile), "UTF-8")) {
// mtime is written last, so that truncated writes won't be valid.
- out.write(Long.toString(used) + " " + Long.toString(Time.now()));
+ out.write(Long.toString(used) + " " + Long.toString(timer.now()));
out.flush();
}
} catch (IOException ioe) {
@@ -434,7 +446,7 @@ class BlockPoolSlice {
try {
sc = new Scanner(restartMeta, "UTF-8");
// The restart meta file exists
- if (sc.hasNextLong() && (sc.nextLong() > Time.now())) {
+ if (sc.hasNextLong() && (sc.nextLong() > timer.now())) {
// It didn't expire. Load the replica as a RBW.
// We don't know the expected block length, so just use 0
// and don't reserve any more space for writes.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c07f7fa8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index 5d987fe..5350974 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -113,6 +113,7 @@ import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Time;
+import org.apache.hadoop.util.Timer;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
@@ -127,6 +128,7 @@ import com.google.common.collect.Sets;
class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
static final Log LOG = LogFactory.getLog(FsDatasetImpl.class);
private final static boolean isNativeIOAvailable;
+ private Timer timer;
static {
isNativeIOAvailable = NativeIO.isAvailable();
if (Path.WINDOWS && !isNativeIOAvailable) {
@@ -433,7 +435,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
for (final NamespaceInfo nsInfo : nsInfos) {
String bpid = nsInfo.getBlockPoolID();
try {
- fsVolume.addBlockPool(bpid, this.conf);
+ fsVolume.addBlockPool(bpid, this.conf, this.timer);
fsVolume.getVolumeMap(bpid, tempVolumeMap, ramDiskReplicaTracker);
} catch (IOException e) {
LOG.warn("Caught exception when adding " + fsVolume +
@@ -3080,5 +3082,10 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
evictLazyPersistBlocks(bytesNeeded);
return cacheManager.reserve(bytesNeeded) > 0;
}
+
+ @VisibleForTesting
+ public void setTimer(Timer newTimer) {
+ this.timer = newTimer;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c07f7fa8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
index 8fd52c3..6b79073 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
@@ -59,6 +59,7 @@ import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.CloseableReferenceCount;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import org.apache.hadoop.util.Time;
+import org.apache.hadoop.util.Timer;
import org.codehaus.jackson.annotate.JsonProperty;
import org.codehaus.jackson.map.ObjectMapper;
import org.slf4j.Logger;
@@ -858,8 +859,18 @@ public class FsVolumeImpl implements FsVolumeSpi {
}
void addBlockPool(String bpid, Configuration conf) throws IOException {
+ addBlockPool(bpid, conf, null);
+ }
+
+ void addBlockPool(String bpid, Configuration conf, Timer timer)
+ throws IOException {
File bpdir = new File(currentDir, bpid);
- BlockPoolSlice bp = new BlockPoolSlice(bpid, this, bpdir, conf);
+ BlockPoolSlice bp;
+ if (timer == null) {
+ bp = new BlockPoolSlice(bpid, this, bpdir, conf, new Timer());
+ } else {
+ bp = new BlockPoolSlice(bpid, this, bpdir, conf, timer);
+ }
bpSlices.put(bpid, bp);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c07f7fa8/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 98eb326..7607c32 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -2738,4 +2738,17 @@
reduces initial request failures after datanode restart.
</description>
</property>
+
+<property>
+ <name>dfs.datanode.cached-dfsused.check.interval.ms</name>
+ <value>600000</value>
+ <description>
+ The interval check time of loading DU_CACHE_FILE in each volume.
+ When the cluster doing the rolling upgrade operations, it will
+ usually lead dfsUsed cache file of each volume expired and redo the
+ du operations in datanode and that makes datanode start slowly. Adjust
+ this property can make cache file be available for the time as you want.
+ </description>
+</property>
+
</configuration>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c07f7fa8/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
index a3d5769..cdc1d61 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
@@ -48,6 +48,7 @@ import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.io.MultipleIOException;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.DiskChecker;
+import org.apache.hadoop.util.FakeTimer;
import org.apache.hadoop.util.StringUtils;
import org.junit.Before;
import org.junit.Test;
@@ -57,13 +58,18 @@ import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import java.io.File;
+import java.io.FileOutputStream;
import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DN_CACHED_DFSUSED_CHECK_INTERVAL_MS;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -444,4 +450,95 @@ public class TestFsDatasetImpl {
assertSame(replica,
BlockPoolSlice.selectReplicaToDelete(replicaOtherNewer, replica));
}
+
+ @Test
+ public void testLoadingDfsUsedForVolumes() throws IOException,
+ InterruptedException {
+ long waitIntervalTime = 5000;
+ // Initialize the cachedDfsUsedIntervalTime larger than waitIntervalTime
+ // to avoid cache-dfsused time expired
+ long cachedDfsUsedIntervalTime = waitIntervalTime + 1000;
+ conf.setLong(DFS_DN_CACHED_DFSUSED_CHECK_INTERVAL_MS,
+ cachedDfsUsedIntervalTime);
+
+ long cacheDfsUsed = 1024;
+ long dfsUsed = getDfsUsedValueOfNewVolume(cacheDfsUsed, waitIntervalTime);
+
+ assertEquals(cacheDfsUsed, dfsUsed);
+ }
+
+ @Test
+ public void testLoadingDfsUsedForVolumesExpired() throws IOException,
+ InterruptedException {
+ long waitIntervalTime = 5000;
+ // Initialize the cachedDfsUsedIntervalTime smaller than waitIntervalTime
+ // to make cache-dfsused time expired
+ long cachedDfsUsedIntervalTime = waitIntervalTime - 1000;
+ conf.setLong(DFS_DN_CACHED_DFSUSED_CHECK_INTERVAL_MS,
+ cachedDfsUsedIntervalTime);
+
+ long cacheDfsUsed = 1024;
+ long dfsUsed = getDfsUsedValueOfNewVolume(cacheDfsUsed, waitIntervalTime);
+
+ // Because the cache-dfsused expired and the dfsUsed will be recalculated
+ assertTrue(cacheDfsUsed != dfsUsed);
+ }
+
+ private long getDfsUsedValueOfNewVolume(long cacheDfsUsed,
+ long waitIntervalTime) throws IOException, InterruptedException {
+ List<NamespaceInfo> nsInfos = Lists.newArrayList();
+ nsInfos.add(new NamespaceInfo(0, CLUSTER_ID, BLOCK_POOL_IDS[0], 1));
+
+ String CURRENT_DIR = "current";
+ String DU_CACHE_FILE = BlockPoolSlice.DU_CACHE_FILE;
+ String path = BASE_DIR + "/newData0";
+ String pathUri = new Path(path).toUri().toString();
+ StorageLocation loc = StorageLocation.parse(pathUri);
+ Storage.StorageDirectory sd = createStorageDirectory(new File(path));
+ DataStorage.VolumeBuilder builder =
+ new DataStorage.VolumeBuilder(storage, sd);
+ when(
+ storage.prepareVolume(eq(datanode), eq(loc.getFile()),
+ anyListOf(NamespaceInfo.class))).thenReturn(builder);
+
+ String cacheFilePath =
+ String.format("%s/%s/%s/%s/%s", path, CURRENT_DIR, BLOCK_POOL_IDS[0],
+ CURRENT_DIR, DU_CACHE_FILE);
+ File outFile = new File(cacheFilePath);
+
+ if (!outFile.getParentFile().exists()) {
+ outFile.getParentFile().mkdirs();
+ }
+
+ if (outFile.exists()) {
+ outFile.delete();
+ }
+
+ FakeTimer timer = new FakeTimer();
+ try {
+ try (Writer out =
+ new OutputStreamWriter(new FileOutputStream(outFile),
+ StandardCharsets.UTF_8)) {
+ // Write the dfsUsed value and the time to cache file
+ out.write(Long.toString(cacheDfsUsed) + " "
+ + Long.toString(timer.now()));
+ out.flush();
+ }
+ } catch (IOException ioe) {
+ }
+
+ dataset.setTimer(timer);
+ timer.advance(waitIntervalTime);
+ dataset.addVolume(loc, nsInfos);
+
+ // Get the last volume which was just added before
+ FsVolumeImpl newVolume;
+ try (FsDatasetSpi.FsVolumeReferences volumes =
+ dataset.getFsVolumeReferences()) {
+ newVolume = (FsVolumeImpl) volumes.get(volumes.size() - 1);
+ }
+ long dfsUsed = newVolume.getDfsUsed();
+
+ return dfsUsed;
+ }
}