You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ar...@apache.org on 2016/01/16 01:11:04 UTC

[38/43] hadoop git commit: HDFS-9624. DataNode start slowly due to the initial DU command operations. (Lin Yiqun via wang)

HDFS-9624. DataNode start slowly due to the initial DU command operations. (Lin Yiqun via wang)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c07f7fa8
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c07f7fa8
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c07f7fa8

Branch: refs/heads/HDFS-1312
Commit: c07f7fa8ff752436726239d938e0461236839acf
Parents: a9c69eb
Author: Andrew Wang <wa...@apache.org>
Authored: Fri Jan 15 11:28:46 2016 -0800
Committer: Andrew Wang <wa...@apache.org>
Committed: Fri Jan 15 11:28:46 2016 -0800

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |  3 +
 .../org/apache/hadoop/hdfs/DFSConfigKeys.java   |  5 +
 .../datanode/fsdataset/impl/BlockPoolSlice.java | 34 ++++---
 .../datanode/fsdataset/impl/FsDatasetImpl.java  |  9 +-
 .../datanode/fsdataset/impl/FsVolumeImpl.java   | 13 ++-
 .../src/main/resources/hdfs-default.xml         | 13 +++
 .../fsdataset/impl/TestFsDatasetImpl.java       | 97 ++++++++++++++++++++
 7 files changed, 161 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/c07f7fa8/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 999c6c8..94f55b8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -933,6 +933,9 @@ Release 2.9.0 - UNRELEASED
     HDFS-9350. Avoid creating temprorary strings in Block.toString() and
     getBlockName() (Staffan Friberg via cmccabe)
 
+    HDFS-9624. DataNode start slowly due to the initial DU command operations.
+    (Lin Yiqun via wang)
+
   OPTIMIZATIONS
 
   BUG FIXES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c07f7fa8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 22859ee..9dd251f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -125,6 +125,11 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final String  DFS_DATANODE_DUPLICATE_REPLICA_DELETION = "dfs.datanode.duplicate.replica.deletion";
   public static final boolean DFS_DATANODE_DUPLICATE_REPLICA_DELETION_DEFAULT = true;
 
+  public static final String DFS_DN_CACHED_DFSUSED_CHECK_INTERVAL_MS =
+      "dfs.datanode.cached-dfsused.check.interval.ms";
+  public static final long DFS_DN_CACHED_DFSUSED_CHECK_INTERVAL_DEFAULT_MS =
+      600000;
+
   public static final String  DFS_NAMENODE_PATH_BASED_CACHE_BLOCK_MAP_ALLOCATION_PERCENT =
     "dfs.namenode.path.based.cache.block.map.allocation.percent";
   public static final float    DFS_NAMENODE_PATH_BASED_CACHE_BLOCK_MAP_ALLOCATION_PERCENT_DEFAULT = 0.25f;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c07f7fa8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
index 571f085..188ab68 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
@@ -55,7 +55,7 @@ import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.DiskChecker;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 import org.apache.hadoop.util.ShutdownHookManager;
-import org.apache.hadoop.util.Time;
+import org.apache.hadoop.util.Timer;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.io.Files;
@@ -79,12 +79,15 @@ class BlockPoolSlice {
   private final File rbwDir; // directory store RBW replica
   private final File tmpDir; // directory store Temporary replica
   private final int ioFileBufferSize;
-  private static final String DU_CACHE_FILE = "dfsUsed";
+  @VisibleForTesting
+  public static final String DU_CACHE_FILE = "dfsUsed";
   private volatile boolean dfsUsedSaved = false;
   private static final int SHUTDOWN_HOOK_PRIORITY = 30;
   private final boolean deleteDuplicateReplicas;
   private static final String REPLICA_CACHE_FILE = "replicas";
   private final long replicaCacheExpiry = 5*60*1000;
+  private final long cachedDfsUsedCheckTime;
+  private final Timer timer;
 
   // TODO:FEDERATION scalability issue - a thread per DU is needed
   private final DU dfsUsage;
@@ -95,10 +98,11 @@ class BlockPoolSlice {
    * @param volume {@link FsVolumeImpl} to which this BlockPool belongs to
    * @param bpDir directory corresponding to the BlockPool
    * @param conf configuration
+   * @param timer include methods for getting time
    * @throws IOException
    */
   BlockPoolSlice(String bpid, FsVolumeImpl volume, File bpDir,
-      Configuration conf) throws IOException {
+      Configuration conf, Timer timer) throws IOException {
     this.bpid = bpid;
     this.volume = volume;
     this.currentDir = new File(bpDir, DataStorage.STORAGE_DIR_CURRENT); 
@@ -117,6 +121,12 @@ class BlockPoolSlice {
         DFSConfigKeys.DFS_DATANODE_DUPLICATE_REPLICA_DELETION,
         DFSConfigKeys.DFS_DATANODE_DUPLICATE_REPLICA_DELETION_DEFAULT);
 
+    this.cachedDfsUsedCheckTime =
+        conf.getLong(
+            DFSConfigKeys.DFS_DN_CACHED_DFSUSED_CHECK_INTERVAL_MS,
+            DFSConfigKeys.DFS_DN_CACHED_DFSUSED_CHECK_INTERVAL_DEFAULT_MS);
+    this.timer = timer;
+
     // Files that were being written when the datanode was last shutdown
     // are now moved back to the data directory. It is possible that
     // in the future, we might want to do some sort of datanode-local
@@ -187,11 +197,13 @@ class BlockPoolSlice {
     dfsUsage.incDfsUsed(value);
   }
   
-   /**
-   * Read in the cached DU value and return it if it is less than 600 seconds
-   * old (DU update interval). Slight imprecision of dfsUsed is not critical
-   * and skipping DU can significantly shorten the startup time.
-   * If the cached value is not available or too old, -1 is returned.
+  /**
+   * Read in the cached DU value and return it if it is less than
+   * cachedDfsUsedCheckTime which is set by
+   * dfs.datanode.cached-dfsused.check.interval.ms parameter. Slight imprecision
+   * of dfsUsed is not critical and skipping DU can significantly shorten the
+   * startup time. If the cached value is not available or too old, -1 is
+   * returned.
    */
   long loadDfsUsed() {
     long cachedDfsUsed;
@@ -219,7 +231,7 @@ class BlockPoolSlice {
       }
 
       // Return the cached value if mtime is okay.
-      if (mtime > 0 && (Time.now() - mtime < 600000L)) {
+      if (mtime > 0 && (timer.now() - mtime < cachedDfsUsedCheckTime)) {
         FsDatasetImpl.LOG.info("Cached dfsUsed found for " + currentDir + ": " +
             cachedDfsUsed);
         return cachedDfsUsed;
@@ -245,7 +257,7 @@ class BlockPoolSlice {
       try (Writer out = new OutputStreamWriter(
           new FileOutputStream(outFile), "UTF-8")) {
         // mtime is written last, so that truncated writes won't be valid.
-        out.write(Long.toString(used) + " " + Long.toString(Time.now()));
+        out.write(Long.toString(used) + " " + Long.toString(timer.now()));
         out.flush();
       }
     } catch (IOException ioe) {
@@ -434,7 +446,7 @@ class BlockPoolSlice {
       try {
         sc = new Scanner(restartMeta, "UTF-8");
         // The restart meta file exists
-        if (sc.hasNextLong() && (sc.nextLong() > Time.now())) {
+        if (sc.hasNextLong() && (sc.nextLong() > timer.now())) {
           // It didn't expire. Load the replica as a RBW.
           // We don't know the expected block length, so just use 0
           // and don't reserve any more space for writes.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c07f7fa8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index 5d987fe..5350974 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -113,6 +113,7 @@ import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.Time;
+import org.apache.hadoop.util.Timer;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
@@ -127,6 +128,7 @@ import com.google.common.collect.Sets;
 class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   static final Log LOG = LogFactory.getLog(FsDatasetImpl.class);
   private final static boolean isNativeIOAvailable;
+  private Timer timer;
   static {
     isNativeIOAvailable = NativeIO.isAvailable();
     if (Path.WINDOWS && !isNativeIOAvailable) {
@@ -433,7 +435,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     for (final NamespaceInfo nsInfo : nsInfos) {
       String bpid = nsInfo.getBlockPoolID();
       try {
-        fsVolume.addBlockPool(bpid, this.conf);
+        fsVolume.addBlockPool(bpid, this.conf, this.timer);
         fsVolume.getVolumeMap(bpid, tempVolumeMap, ramDiskReplicaTracker);
       } catch (IOException e) {
         LOG.warn("Caught exception when adding " + fsVolume +
@@ -3080,5 +3082,10 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     evictLazyPersistBlocks(bytesNeeded);
     return cacheManager.reserve(bytesNeeded) > 0;
   }
+
+  @VisibleForTesting
+  public void setTimer(Timer newTimer) {
+    this.timer = newTimer;
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c07f7fa8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
index 8fd52c3..6b79073 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
@@ -59,6 +59,7 @@ import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.util.CloseableReferenceCount;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 import org.apache.hadoop.util.Time;
+import org.apache.hadoop.util.Timer;
 import org.codehaus.jackson.annotate.JsonProperty;
 import org.codehaus.jackson.map.ObjectMapper;
 import org.slf4j.Logger;
@@ -858,8 +859,18 @@ public class FsVolumeImpl implements FsVolumeSpi {
   }
 
   void addBlockPool(String bpid, Configuration conf) throws IOException {
+    addBlockPool(bpid, conf, null);
+  }
+
+  void addBlockPool(String bpid, Configuration conf, Timer timer)
+      throws IOException {
     File bpdir = new File(currentDir, bpid);
-    BlockPoolSlice bp = new BlockPoolSlice(bpid, this, bpdir, conf);
+    BlockPoolSlice bp;
+    if (timer == null) {
+      bp = new BlockPoolSlice(bpid, this, bpdir, conf, new Timer());
+    } else {
+      bp = new BlockPoolSlice(bpid, this, bpdir, conf, timer);
+    }
     bpSlices.put(bpid, bp);
   }
   

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c07f7fa8/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 98eb326..7607c32 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -2738,4 +2738,17 @@
     reduces initial request failures after datanode restart.
   </description>
 </property>
+
+<property>
+  <name>dfs.datanode.cached-dfsused.check.interval.ms</name>
+  <value>600000</value>
+  <description>
+    The interval check time of loading DU_CACHE_FILE in each volume.
+    When the cluster doing the rolling upgrade operations, it will
+    usually lead dfsUsed cache file of each volume expired and redo the
+    du operations in datanode and that makes datanode start slowly. Adjust
+    this property can make cache file be available for the time as you want.
+  </description>
+</property>
+
 </configuration>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c07f7fa8/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
index a3d5769..cdc1d61 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
@@ -48,6 +48,7 @@ import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.io.MultipleIOException;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.util.DiskChecker;
+import org.apache.hadoop.util.FakeTimer;
 import org.apache.hadoop.util.StringUtils;
 import org.junit.Before;
 import org.junit.Test;
@@ -57,13 +58,18 @@ import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
 import java.io.File;
+import java.io.FileOutputStream;
 import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DN_CACHED_DFSUSED_CHECK_INTERVAL_MS;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -444,4 +450,95 @@ public class TestFsDatasetImpl {
     assertSame(replica,
         BlockPoolSlice.selectReplicaToDelete(replicaOtherNewer, replica));
   }
+
+  @Test
+  public void testLoadingDfsUsedForVolumes() throws IOException,
+      InterruptedException {
+    long waitIntervalTime = 5000;
+    // Initialize the cachedDfsUsedIntervalTime larger than waitIntervalTime
+    // to avoid cache-dfsused time expired
+    long cachedDfsUsedIntervalTime = waitIntervalTime + 1000;
+    conf.setLong(DFS_DN_CACHED_DFSUSED_CHECK_INTERVAL_MS,
+        cachedDfsUsedIntervalTime);
+
+    long cacheDfsUsed = 1024;
+    long dfsUsed = getDfsUsedValueOfNewVolume(cacheDfsUsed, waitIntervalTime);
+
+    assertEquals(cacheDfsUsed, dfsUsed);
+  }
+
+  @Test
+  public void testLoadingDfsUsedForVolumesExpired() throws IOException,
+      InterruptedException {
+    long waitIntervalTime = 5000;
+    // Initialize the cachedDfsUsedIntervalTime smaller than waitIntervalTime
+    // to make cache-dfsused time expired
+    long cachedDfsUsedIntervalTime = waitIntervalTime - 1000;
+    conf.setLong(DFS_DN_CACHED_DFSUSED_CHECK_INTERVAL_MS,
+        cachedDfsUsedIntervalTime);
+
+    long cacheDfsUsed = 1024;
+    long dfsUsed = getDfsUsedValueOfNewVolume(cacheDfsUsed, waitIntervalTime);
+
+    // Because the cache-dfsused expired and the dfsUsed will be recalculated
+    assertTrue(cacheDfsUsed != dfsUsed);
+  }
+
+  private long getDfsUsedValueOfNewVolume(long cacheDfsUsed,
+      long waitIntervalTime) throws IOException, InterruptedException {
+    List<NamespaceInfo> nsInfos = Lists.newArrayList();
+    nsInfos.add(new NamespaceInfo(0, CLUSTER_ID, BLOCK_POOL_IDS[0], 1));
+
+    String CURRENT_DIR = "current";
+    String DU_CACHE_FILE = BlockPoolSlice.DU_CACHE_FILE;
+    String path = BASE_DIR + "/newData0";
+    String pathUri = new Path(path).toUri().toString();
+    StorageLocation loc = StorageLocation.parse(pathUri);
+    Storage.StorageDirectory sd = createStorageDirectory(new File(path));
+    DataStorage.VolumeBuilder builder =
+        new DataStorage.VolumeBuilder(storage, sd);
+    when(
+        storage.prepareVolume(eq(datanode), eq(loc.getFile()),
+            anyListOf(NamespaceInfo.class))).thenReturn(builder);
+
+    String cacheFilePath =
+        String.format("%s/%s/%s/%s/%s", path, CURRENT_DIR, BLOCK_POOL_IDS[0],
+            CURRENT_DIR, DU_CACHE_FILE);
+    File outFile = new File(cacheFilePath);
+
+    if (!outFile.getParentFile().exists()) {
+      outFile.getParentFile().mkdirs();
+    }
+
+    if (outFile.exists()) {
+      outFile.delete();
+    }
+
+    FakeTimer timer = new FakeTimer();
+    try {
+      try (Writer out =
+          new OutputStreamWriter(new FileOutputStream(outFile),
+              StandardCharsets.UTF_8)) {
+        // Write the dfsUsed value and the time to cache file
+        out.write(Long.toString(cacheDfsUsed) + " "
+            + Long.toString(timer.now()));
+        out.flush();
+      }
+    } catch (IOException ioe) {
+    }
+
+    dataset.setTimer(timer);
+    timer.advance(waitIntervalTime);
+    dataset.addVolume(loc, nsInfos);
+
+    // Get the last volume which was just added before
+    FsVolumeImpl newVolume;
+    try (FsDatasetSpi.FsVolumeReferences volumes =
+        dataset.getFsVolumeReferences()) {
+      newVolume = (FsVolumeImpl) volumes.get(volumes.size() - 1);
+    }
+    long dfsUsed = newVolume.getDfsUsed();
+
+    return dfsUsed;
+  }
 }