You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ki...@apache.org on 2020/10/21 06:00:48 UTC

[hadoop] branch trunk updated: HDFS-15618. Improve datanode shutdown latency. Contributed by Ahmed Hussein.

This is an automated email from the ASF dual-hosted git repository.

kihwal pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 88a9f42  HDFS-15618. Improve datanode shutdown latency. Contributed by Ahmed Hussein.
88a9f42 is described below

commit 88a9f42f320e7c16cf0b0b424283f8e4486ef286
Author: Kihwal Lee <ki...@apache.org>
AuthorDate: Wed Oct 21 00:59:35 2020 -0500

    HDFS-15618. Improve datanode shutdown latency. Contributed by Ahmed Hussein.
---
 .../java/org/apache/hadoop/hdfs/DFSConfigKeys.java |   8 ++
 .../hadoop/hdfs/server/datanode/BlockScanner.java  |  33 ++++-
 .../hadoop/hdfs/server/datanode/DataNode.java      |   4 +-
 .../hadoop/hdfs/server/datanode/VolumeScanner.java |   3 +
 .../server/datanode/VolumeScannerCBInjector.java   |  51 ++++++++
 .../src/main/resources/hdfs-default.xml            |   9 ++
 .../org/apache/hadoop/hdfs/MiniDFSCluster.java     |  28 +++-
 .../hdfs/server/datanode/TestBlockScanner.java     | 142 +++++++++++++++++++++
 8 files changed, 271 insertions(+), 7 deletions(-)

diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 6b242f0..f59455e 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -866,6 +866,14 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final int     DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT = 21 * 24;  // 3 weeks.
   public static final String  DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND = "dfs.block.scanner.volume.bytes.per.second";
   public static final long    DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND_DEFAULT = 1048576L;
+  /**
+   * The amount of time in milliseconds that the BlockScanner times out waiting
+   * for the VolumeScanner thread to join during a shutdown call.
+   */
+  public static final String  DFS_BLOCK_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC_KEY =
+      "dfs.block.scanner.volume.join.timeout.ms";
+  public static final long DFS_BLOCK_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC_DEFAULT =
+      TimeUnit.SECONDS.toMillis(5);
   public static final String  DFS_BLOCK_SCANNER_SKIP_RECENT_ACCESSED =
       "dfs.block.scanner.skip.recent.accessed";
   public static final boolean DFS_BLOCK_SCANNER_SKIP_RECENT_ACCESSED_DEFAULT =
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockScanner.java
index 2895233..485cf00 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockScanner.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockScanner.java
@@ -22,6 +22,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SCANNER_SKIP_RECENT
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SCANNER_SKIP_RECENT_ACCESSED_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT;
 
@@ -68,6 +70,12 @@ public class BlockScanner {
    */
   private Conf conf;
 
+  /**
+   * Timeout duration in milliseconds waiting for {@link VolumeScanner} to stop
+   * inside {@link #removeAllVolumeScanners}.
+   */
+  private long joinVolumeScannersTimeOutMs;
+
   @VisibleForTesting
   void setConf(Conf conf) {
     this.conf = conf;
@@ -185,6 +193,9 @@ public class BlockScanner {
 
   public BlockScanner(DataNode datanode, Configuration conf) {
     this.datanode = datanode;
+    setJoinVolumeScannersTimeOutMs(
+        conf.getLong(DFS_BLOCK_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC_KEY,
+            DFS_BLOCK_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC_DEFAULT));
     this.conf = new Conf(conf);
     if (isEnabled()) {
       LOG.info("Initialized block scanner with targetBytesPerSec {}",
@@ -204,6 +215,13 @@ public class BlockScanner {
     return (conf.scanPeriodMs > 0) && (conf.targetBytesPerSec > 0);
   }
 
+  /**
+   * Returns true if there is any scanner thread registered.
+   */
+  public synchronized boolean hasAnyRegisteredScanner() {
+    return !scanners.isEmpty();
+  }
+
  /**
   * Set up a scanner for the given block pool and volume.
   *
@@ -268,7 +286,10 @@ public class BlockScanner {
   /**
    * Stops and removes all volume scanners.
    *
-   * This function will block until all the volume scanners have stopped.
+   * This function is called on shutdown. It will return even if some of
+   * the scanners don't terminate in time. Since the scanners are daemon
+   * threads and do not alter the block content, it is safe to ignore
+   * such conditions on shutdown.
    */
   public synchronized void removeAllVolumeScanners() {
     for (Entry<String, VolumeScanner> entry : scanners.entrySet()) {
@@ -276,7 +297,7 @@ public class BlockScanner {
     }
     for (Entry<String, VolumeScanner> entry : scanners.entrySet()) {
       Uninterruptibles.joinUninterruptibly(entry.getValue(),
-          5, TimeUnit.MINUTES);
+          getJoinVolumeScannersTimeOutMs(), TimeUnit.MILLISECONDS);
     }
     scanners.clear();
   }
@@ -352,6 +373,14 @@ public class BlockScanner {
     scanner.markSuspectBlock(block);
   }
 
+  public long getJoinVolumeScannersTimeOutMs() {
+    return joinVolumeScannersTimeOutMs;
+  }
+
+  public void setJoinVolumeScannersTimeOutMs(long joinScannersTimeOutMs) {
+    this.joinVolumeScannersTimeOutMs = joinScannersTimeOutMs;
+  }
+
   @InterfaceAudience.Private
   public static class Servlet extends HttpServlet {
     private static final long serialVersionUID = 1L;
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index 6ef1f7a..e2f98c3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -1660,7 +1660,9 @@ public class DataNode extends ReconfigurableBase
       // a block pool id
       String bpId = bpos.getBlockPoolId();
 
-      blockScanner.disableBlockPoolId(bpId);
+      if (blockScanner.hasAnyRegisteredScanner()) {
+        blockScanner.disableBlockPoolId(bpId);
+      }
 
       if (data != null) {
         data.shutdownBlockPool(bpId);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java
index 5b92ac6..6bc25eb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java
@@ -670,12 +670,14 @@ public class VolumeScanner extends Thread {
         LOG.error("{} exiting because of exception ", this, e);
       }
       LOG.info("{} exiting.", this);
+      VolumeScannerCBInjector.get().preSavingBlockIteratorTask(this);
       // Save the current position of all block iterators and close them.
       for (BlockIterator iter : blockIters) {
         saveBlockIterator(iter);
         IOUtils.cleanup(null, iter);
       }
     } finally {
+      VolumeScannerCBInjector.get().terminationCallBack(this);
       // When the VolumeScanner exits, release the reference we were holding
       // on the volume.  This will allow the volume to be removed later.
       IOUtils.cleanup(null, ref);
@@ -695,6 +697,7 @@ public class VolumeScanner extends Thread {
     stopping = true;
     notify();
     this.interrupt();
+    VolumeScannerCBInjector.get().shutdownCallBack(this);
   }
 
 
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScannerCBInjector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScannerCBInjector.java
new file mode 100644
index 0000000..d15d8d4
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScannerCBInjector.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * Used for injecting call backs in {@link VolumeScanner}
+ * and {@link BlockScanner} tests.
+ * Calls into this are a no-op in production code.
+ */
+@VisibleForTesting
+@InterfaceAudience.Private
+public class VolumeScannerCBInjector {
+  private static VolumeScannerCBInjector instance =
+      new VolumeScannerCBInjector();
+
+  public static VolumeScannerCBInjector get() {
+    return instance;
+  }
+
+  public static void set(VolumeScannerCBInjector injector) {
+    instance = injector;
+  }
+
+  public void preSavingBlockIteratorTask(final VolumeScanner volumeScanner) {
+  }
+
+  public void shutdownCallBack(final VolumeScanner volumeScanner) {
+  }
+
+  public void terminationCallBack(final VolumeScanner volumeScanner) {
+  }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index c6d4b6f..c2e0894 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -1616,6 +1616,15 @@
 </property>
 
 <property>
+  <name>dfs.block.scanner.volume.join.timeout.ms</name>
+  <value>5000</value>
+  <description>
+    The amount of time in milliseconds that the BlockScanner times out waiting
+    for the VolumeScanner thread to join during a shutdown call.
+  </description>
+</property>
+
+<property>
   <name>dfs.datanode.readahead.bytes</name>
   <value>4194304</value>
   <description>
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
index 48697c7..71e73ec 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
@@ -22,6 +22,7 @@ import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.NET_TOPOLOGY_NO
 import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SASL_DEFAULT;
 import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SASL_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_HTTPS_KEYSTORE_RESOURCE_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY;
@@ -80,6 +81,7 @@ import org.apache.hadoop.thirdparty.com.google.common.collect.ArrayListMultimap;
 import org.apache.hadoop.thirdparty.com.google.common.collect.Multimap;
 import org.apache.hadoop.hdfs.server.common.blockaliasmap.BlockAliasMap;
 import org.apache.hadoop.hdfs.server.common.blockaliasmap.impl.InMemoryLevelDBAliasMapClient;
+import org.apache.hadoop.hdfs.server.datanode.VolumeScanner;
 import org.apache.hadoop.hdfs.server.namenode.ImageServlet;
 import org.apache.hadoop.http.HttpConfig;
 import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
@@ -171,6 +173,13 @@ public class MiniDFSCluster implements AutoCloseable {
       = DFS_NAMENODE_SAFEMODE_EXTENSION_KEY + ".testing";
   public static final String  DFS_NAMENODE_DECOMMISSION_INTERVAL_TESTING_KEY
       = DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY + ".testing";
+  /**
+   * For the Junit tests, this is the default value of the The amount of time
+   * in milliseconds that the BlockScanner times out waiting for the
+   * {@link VolumeScanner} thread to join during a shutdown call.
+   */
+  public static final long DEFAULT_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC =
+      TimeUnit.SECONDS.toMillis(30);
 
   // Changing this default may break some tests that assume it is 2.
   private static final int DEFAULT_STORAGES_PER_DATANODE = 2;
@@ -217,8 +226,7 @@ public class MiniDFSCluster implements AutoCloseable {
 
     public Builder(Configuration conf) {
       this.conf = conf;
-      this.storagesPerDatanode =
-          FsDatasetTestUtils.Factory.getFactory(conf).getDefaultNumOfDataDirs();
+      initDefaultConfigurations();
       if (null == conf.get(HDFS_MINIDFS_BASEDIR)) {
         conf.set(HDFS_MINIDFS_BASEDIR,
             new File(getBaseDirectory()).getAbsolutePath());
@@ -227,8 +235,7 @@ public class MiniDFSCluster implements AutoCloseable {
 
     public Builder(Configuration conf, File basedir) {
       this.conf = conf;
-      this.storagesPerDatanode =
-          FsDatasetTestUtils.Factory.getFactory(conf).getDefaultNumOfDataDirs();
+      initDefaultConfigurations();
       if (null == basedir) {
         throw new IllegalArgumentException(
             "MiniDFSCluster base directory cannot be null");
@@ -492,6 +499,19 @@ public class MiniDFSCluster implements AutoCloseable {
     public MiniDFSCluster build() throws IOException {
       return new MiniDFSCluster(this);
     }
+
+    /**
+     * Initializes default values for the cluster.
+     */
+    private void initDefaultConfigurations() {
+      long defaultScannerVolumeTimeOut =
+          conf.getLong(DFS_BLOCK_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC_KEY,
+              DEFAULT_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC);
+      conf.setLong(DFS_BLOCK_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC_KEY,
+          defaultScannerVolumeTimeOut);
+      this.storagesPerDatanode =
+          FsDatasetTestUtils.Factory.getFactory(conf).getDefaultNumOfDataDirs();
+    }
   }
   
   /**
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockScanner.java
index 75d1c44..b34b7df 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockScanner.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockScanner.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hdfs.server.datanode;
 
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SCANNER_SKIP_RECENT_ACCESSED;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND;
 import static org.apache.hadoop.hdfs.server.datanode.BlockScanner.Conf.INTERNAL_DFS_DATANODE_SCAN_PERIOD_MS;
@@ -38,6 +39,7 @@ import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
 import java.util.function.Supplier;
@@ -96,9 +98,19 @@ public class TestBlockScanner {
     TestContext(Configuration conf, int numNameServices) throws Exception {
       this.numNameServices = numNameServices;
       File basedir = new File(GenericTestUtils.getRandomizedTempPath());
+      long volumeScannerTimeOutFromConf =
+          conf.getLong(DFS_BLOCK_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC_KEY, -1);
+      long expectedVScannerTimeOut =
+          volumeScannerTimeOutFromConf == -1
+              ? MiniDFSCluster.DEFAULT_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC
+              : volumeScannerTimeOutFromConf;
       MiniDFSCluster.Builder bld = new MiniDFSCluster.Builder(conf, basedir).
           numDataNodes(1).
           storagesPerDatanode(1);
+      // verify that the builder was initialized to get the default
+      // configuration designated for Junit tests.
+      assertEquals(expectedVScannerTimeOut,
+            conf.getLong(DFS_BLOCK_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC_KEY, -1));
       if (numNameServices > 1) {
         bld.nnTopology(MiniDFSNNTopology.
               simpleFederatedTopology(numNameServices));
@@ -1012,4 +1024,134 @@ public class TestBlockScanner {
         0, info.blocksScanned);
     ctx.close();
   }
+
+  /**
+   * Test a DN does not wait for the VolumeScanners to finish before shutting
+   * down.
+   *
+   * @throws Exception
+   */
+  @Test(timeout=120000)
+  public void testFastDatanodeShutdown() throws Exception {
+    // set the joinTimeOut to a value smaller than the completion time of the
+    // VolumeScanner.
+    testDatanodeShutDown(50L, 1000L, true);
+  }
+
+  /**
+   * Test a DN waits for the VolumeScanners to finish before shutting down.
+   *
+   * @throws Exception
+   */
+  @Test(timeout=120000)
+  public void testSlowDatanodeShutdown() throws Exception {
+    // Set the joinTimeOut to a value larger than the completion time of the
+    // volume scanner
+    testDatanodeShutDown(TimeUnit.MINUTES.toMillis(5), 1000L,
+        false);
+  }
+
+  private void testDatanodeShutDown(final long joinTimeOutMS,
+      final long delayMS, boolean isFastShutdown) throws Exception {
+    VolumeScannerCBInjector prevVolumeScannerCBInject =
+        VolumeScannerCBInjector.get();
+    try {
+      DelayVolumeScannerResponseToInterrupt injectDelay =
+          new DelayVolumeScannerResponseToInterrupt(delayMS);
+      VolumeScannerCBInjector.set(injectDelay);
+      Configuration conf = new Configuration();
+      conf.setLong(DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, 100L);
+      conf.set(INTERNAL_VOLUME_SCANNER_SCAN_RESULT_HANDLER,
+          TestScanResultHandler.class.getName());
+      conf.setLong(INTERNAL_DFS_BLOCK_SCANNER_CURSOR_SAVE_INTERVAL_MS, 0L);
+      conf.setLong(DFS_BLOCK_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC_KEY,
+          joinTimeOutMS);
+      final TestContext ctx = new TestContext(conf, 1);
+      final int numExpectedBlocks = 10;
+      ctx.createFiles(0, numExpectedBlocks, 1);
+      final TestScanResultHandler.Info info =
+          TestScanResultHandler.getInfo(ctx.volumes.get(0));
+      synchronized (info) {
+        info.sem = new Semaphore(5);
+        info.shouldRun = true;
+        info.notify();
+      }
+      // make sure that the scanners are doing progress
+      GenericTestUtils.waitFor(new Supplier<Boolean>() {
+        @Override
+        public Boolean get() {
+          synchronized (info) {
+            return info.blocksScanned >= 1;
+          }
+        }
+      }, 3, 30000);
+      // mark the time where the
+      long startShutdownTime = Time.monotonicNow();
+      ctx.datanode.shutdown();
+      long endShutdownTime = Time.monotonicNow();
+      long totalTimeShutdown = endShutdownTime - startShutdownTime;
+
+      if (isFastShutdown) {
+        assertTrue("total shutdown time of DN must be smaller than "
+                + "VolumeScanner Response time: " + totalTimeShutdown,
+            totalTimeShutdown < delayMS
+                && totalTimeShutdown >= joinTimeOutMS);
+        // wait for scanners to terminate before we move to the next test.
+        injectDelay.waitForScanners();
+        return;
+      }
+      assertTrue("total shutdown time of DN must be larger than " +
+              "VolumeScanner Response time: " + totalTimeShutdown,
+          totalTimeShutdown >= delayMS
+              && totalTimeShutdown < joinTimeOutMS);
+    } finally {
+      // restore the VolumeScanner callback injector.
+      VolumeScannerCBInjector.set(prevVolumeScannerCBInject);
+    }
+  }
+
+  private static class DelayVolumeScannerResponseToInterrupt extends
+      VolumeScannerCBInjector {
+    final private long delayAmountNS;
+    final private Set<VolumeScanner> scannersToShutDown;
+
+    DelayVolumeScannerResponseToInterrupt(long delayMS) {
+      delayAmountNS =
+          TimeUnit.NANOSECONDS.convert(delayMS, TimeUnit.MILLISECONDS);
+      scannersToShutDown = ConcurrentHashMap.newKeySet();
+    }
+
+    @Override
+    public void preSavingBlockIteratorTask(VolumeScanner volumeScanner) {
+      long remainingTimeNS = delayAmountNS;
+      // busy delay without sleep().
+      long startTime = Time.monotonicNowNanos();
+      long endTime = startTime + remainingTimeNS;
+      long currTime, waitTime = 0;
+      while ((currTime = Time.monotonicNowNanos()) < endTime) {
+        // empty loop. No need to sleep because the thread could be in an
+        // interrupt mode.
+        waitTime = currTime - startTime;
+      }
+      LOG.info("VolumeScanner {} finished delayed Task after {}",
+          volumeScanner.toString(),
+          TimeUnit.NANOSECONDS.convert(waitTime, TimeUnit.MILLISECONDS));
+    }
+
+    @Override
+    public void shutdownCallBack(VolumeScanner volumeScanner) {
+      scannersToShutDown.add(volumeScanner);
+    }
+
+    @Override
+    public void terminationCallBack(VolumeScanner volumeScanner) {
+      scannersToShutDown.remove(volumeScanner);
+    }
+
+    public void waitForScanners() throws TimeoutException,
+        InterruptedException {
+      GenericTestUtils.waitFor(
+          () -> scannersToShutDown.isEmpty(), 10, 120000);
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org