You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ki...@apache.org on 2020/10/22 14:56:50 UTC
[hadoop] branch branch-3.3 updated: HDFS-15618. Improve datanode
shutdown latency. Contributed by Ahmed Hussein.
This is an automated email from the ASF dual-hosted git repository.
kihwal pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-3.3 by this push:
new cf932a7 HDFS-15618. Improve datanode shutdown latency. Contributed by Ahmed Hussein.
cf932a7 is described below
commit cf932a7e2d6182471df4eba1333737912a32534b
Author: Kihwal Lee <ki...@apache.org>
AuthorDate: Thu Oct 22 09:55:28 2020 -0500
HDFS-15618. Improve datanode shutdown latency. Contributed by Ahmed Hussein.
---
.../java/org/apache/hadoop/hdfs/DFSConfigKeys.java | 8 ++
.../hadoop/hdfs/server/datanode/BlockScanner.java | 33 ++++-
.../hadoop/hdfs/server/datanode/DataNode.java | 4 +-
.../hadoop/hdfs/server/datanode/VolumeScanner.java | 3 +
.../server/datanode/VolumeScannerCBInjector.java | 51 ++++++++
.../src/main/resources/hdfs-default.xml | 9 ++
.../org/apache/hadoop/hdfs/MiniDFSCluster.java | 28 +++-
.../hdfs/server/datanode/TestBlockScanner.java | 142 +++++++++++++++++++++
8 files changed, 271 insertions(+), 7 deletions(-)
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 4b8c27b..5264799 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -846,6 +846,14 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final int DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT = 21 * 24; // 3 weeks.
public static final String DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND = "dfs.block.scanner.volume.bytes.per.second";
public static final long DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND_DEFAULT = 1048576L;
+ /**
+ * The amount of time in milliseconds that the BlockScanner times out waiting
+ * for the VolumeScanner thread to join during a shutdown call.
+ */
+ public static final String DFS_BLOCK_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC_KEY =
+ "dfs.block.scanner.volume.join.timeout.ms";
+ public static final long DFS_BLOCK_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC_DEFAULT =
+ TimeUnit.SECONDS.toMillis(5);
public static final String DFS_BLOCK_SCANNER_SKIP_RECENT_ACCESSED =
"dfs.block.scanner.skip.recent.accessed";
public static final boolean DFS_BLOCK_SCANNER_SKIP_RECENT_ACCESSED_DEFAULT =
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockScanner.java
index 82efcf8..dc619f2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockScanner.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockScanner.java
@@ -22,6 +22,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SCANNER_SKIP_RECENT
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SCANNER_SKIP_RECENT_ACCESSED_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT;
@@ -68,6 +70,12 @@ public class BlockScanner {
*/
private Conf conf;
+ /**
+ * Timeout duration in milliseconds waiting for {@link VolumeScanner} to stop
+ * inside {@link #removeAllVolumeScanners}.
+ */
+ private long joinVolumeScannersTimeOutMs;
+
@VisibleForTesting
void setConf(Conf conf) {
this.conf = conf;
@@ -185,6 +193,9 @@ public class BlockScanner {
public BlockScanner(DataNode datanode, Configuration conf) {
this.datanode = datanode;
+ setJoinVolumeScannersTimeOutMs(
+ conf.getLong(DFS_BLOCK_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC_KEY,
+ DFS_BLOCK_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC_DEFAULT));
this.conf = new Conf(conf);
if (isEnabled()) {
LOG.info("Initialized block scanner with targetBytesPerSec {}",
@@ -204,6 +215,13 @@ public class BlockScanner {
return (conf.scanPeriodMs > 0) && (conf.targetBytesPerSec > 0);
}
+ /**
+ * Returns true if there is any scanner thread registered.
+ */
+ public synchronized boolean hasAnyRegisteredScanner() {
+ return !scanners.isEmpty();
+ }
+
/**
* Set up a scanner for the given block pool and volume.
*
@@ -268,7 +286,10 @@ public class BlockScanner {
/**
* Stops and removes all volume scanners.
*
- * This function will block until all the volume scanners have stopped.
+ * This function is called on shutdown. It will return even if some of
+ * the scanners don't terminate in time. Since the scanners are daemon
+ * threads and do not alter the block content, it is safe to ignore
+ * such conditions on shutdown.
*/
public synchronized void removeAllVolumeScanners() {
for (Entry<String, VolumeScanner> entry : scanners.entrySet()) {
@@ -276,7 +297,7 @@ public class BlockScanner {
}
for (Entry<String, VolumeScanner> entry : scanners.entrySet()) {
Uninterruptibles.joinUninterruptibly(entry.getValue(),
- 5, TimeUnit.MINUTES);
+ getJoinVolumeScannersTimeOutMs(), TimeUnit.MILLISECONDS);
}
scanners.clear();
}
@@ -352,6 +373,14 @@ public class BlockScanner {
scanner.markSuspectBlock(block);
}
+ public long getJoinVolumeScannersTimeOutMs() {
+ return joinVolumeScannersTimeOutMs;
+ }
+
+ public void setJoinVolumeScannersTimeOutMs(long joinScannersTimeOutMs) {
+ this.joinVolumeScannersTimeOutMs = joinScannersTimeOutMs;
+ }
+
@InterfaceAudience.Private
public static class Servlet extends HttpServlet {
private static final long serialVersionUID = 1L;
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index 380343d..1ddd05d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -1661,7 +1661,9 @@ public class DataNode extends ReconfigurableBase
// a block pool id
String bpId = bpos.getBlockPoolId();
- blockScanner.disableBlockPoolId(bpId);
+ if (blockScanner.hasAnyRegisteredScanner()) {
+ blockScanner.disableBlockPoolId(bpId);
+ }
if (data != null) {
data.shutdownBlockPool(bpId);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java
index 5e3d523..4728583 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java
@@ -670,12 +670,14 @@ public class VolumeScanner extends Thread {
LOG.error("{} exiting because of exception ", this, e);
}
LOG.info("{} exiting.", this);
+ VolumeScannerCBInjector.get().preSavingBlockIteratorTask(this);
// Save the current position of all block iterators and close them.
for (BlockIterator iter : blockIters) {
saveBlockIterator(iter);
IOUtils.cleanup(null, iter);
}
} finally {
+ VolumeScannerCBInjector.get().terminationCallBack(this);
// When the VolumeScanner exits, release the reference we were holding
// on the volume. This will allow the volume to be removed later.
IOUtils.cleanup(null, ref);
@@ -695,6 +697,7 @@ public class VolumeScanner extends Thread {
stopping = true;
notify();
this.interrupt();
+ VolumeScannerCBInjector.get().shutdownCallBack(this);
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScannerCBInjector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScannerCBInjector.java
new file mode 100644
index 0000000..5798bd1
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScannerCBInjector.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * Used for injecting call backs in {@link VolumeScanner}
+ * and {@link BlockScanner} tests.
+ * Calls into this are a no-op in production code.
+ */
+@VisibleForTesting
+@InterfaceAudience.Private
+public class VolumeScannerCBInjector {
+ private static VolumeScannerCBInjector instance =
+ new VolumeScannerCBInjector();
+
+ public static VolumeScannerCBInjector get() {
+ return instance;
+ }
+
+ public static void set(VolumeScannerCBInjector injector) {
+ instance = injector;
+ }
+
+ public void preSavingBlockIteratorTask(final VolumeScanner volumeScanner) {
+ }
+
+ public void shutdownCallBack(final VolumeScanner volumeScanner) {
+ }
+
+ public void terminationCallBack(final VolumeScanner volumeScanner) {
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 00d5dfd..62b9c5c 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -1597,6 +1597,15 @@
</property>
<property>
+ <name>dfs.block.scanner.volume.join.timeout.ms</name>
+ <value>5000</value>
+ <description>
+ The amount of time in milliseconds that the BlockScanner times out waiting
+ for the VolumeScanner thread to join during a shutdown call.
+ </description>
+</property>
+
+<property>
<name>dfs.datanode.readahead.bytes</name>
<value>4194304</value>
<description>
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
index a2a9066..ff378d5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
@@ -22,6 +22,7 @@ import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.NET_TOPOLOGY_NO
import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SASL_DEFAULT;
import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SASL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_HTTPS_KEYSTORE_RESOURCE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY;
@@ -80,6 +81,7 @@ import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;
import org.apache.hadoop.hdfs.server.common.blockaliasmap.BlockAliasMap;
import org.apache.hadoop.hdfs.server.common.blockaliasmap.impl.InMemoryLevelDBAliasMapClient;
+import org.apache.hadoop.hdfs.server.datanode.VolumeScanner;
import org.apache.hadoop.hdfs.server.namenode.ImageServlet;
import org.apache.hadoop.http.HttpConfig;
import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
@@ -171,6 +173,13 @@ public class MiniDFSCluster implements AutoCloseable {
= DFS_NAMENODE_SAFEMODE_EXTENSION_KEY + ".testing";
public static final String DFS_NAMENODE_DECOMMISSION_INTERVAL_TESTING_KEY
= DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY + ".testing";
+ /**
+ * For the Junit tests, this is the default value of the The amount of time
+ * in milliseconds that the BlockScanner times out waiting for the
+ * {@link VolumeScanner} thread to join during a shutdown call.
+ */
+ public static final long DEFAULT_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC =
+ TimeUnit.SECONDS.toMillis(30);
// Changing this default may break some tests that assume it is 2.
private static final int DEFAULT_STORAGES_PER_DATANODE = 2;
@@ -217,8 +226,7 @@ public class MiniDFSCluster implements AutoCloseable {
public Builder(Configuration conf) {
this.conf = conf;
- this.storagesPerDatanode =
- FsDatasetTestUtils.Factory.getFactory(conf).getDefaultNumOfDataDirs();
+ initDefaultConfigurations();
if (null == conf.get(HDFS_MINIDFS_BASEDIR)) {
conf.set(HDFS_MINIDFS_BASEDIR,
new File(getBaseDirectory()).getAbsolutePath());
@@ -227,8 +235,7 @@ public class MiniDFSCluster implements AutoCloseable {
public Builder(Configuration conf, File basedir) {
this.conf = conf;
- this.storagesPerDatanode =
- FsDatasetTestUtils.Factory.getFactory(conf).getDefaultNumOfDataDirs();
+ initDefaultConfigurations();
if (null == basedir) {
throw new IllegalArgumentException(
"MiniDFSCluster base directory cannot be null");
@@ -492,6 +499,19 @@ public class MiniDFSCluster implements AutoCloseable {
public MiniDFSCluster build() throws IOException {
return new MiniDFSCluster(this);
}
+
+ /**
+ * Initializes default values for the cluster.
+ */
+ private void initDefaultConfigurations() {
+ long defaultScannerVolumeTimeOut =
+ conf.getLong(DFS_BLOCK_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC_KEY,
+ DEFAULT_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC);
+ conf.setLong(DFS_BLOCK_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC_KEY,
+ defaultScannerVolumeTimeOut);
+ this.storagesPerDatanode =
+ FsDatasetTestUtils.Factory.getFactory(conf).getDefaultNumOfDataDirs();
+ }
}
/**
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockScanner.java
index 75d1c44..b34b7df 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockScanner.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockScanner.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.server.datanode;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SCANNER_SKIP_RECENT_ACCESSED;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND;
import static org.apache.hadoop.hdfs.server.datanode.BlockScanner.Conf.INTERNAL_DFS_DATANODE_SCAN_PERIOD_MS;
@@ -38,6 +39,7 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
@@ -96,9 +98,19 @@ public class TestBlockScanner {
TestContext(Configuration conf, int numNameServices) throws Exception {
this.numNameServices = numNameServices;
File basedir = new File(GenericTestUtils.getRandomizedTempPath());
+ long volumeScannerTimeOutFromConf =
+ conf.getLong(DFS_BLOCK_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC_KEY, -1);
+ long expectedVScannerTimeOut =
+ volumeScannerTimeOutFromConf == -1
+ ? MiniDFSCluster.DEFAULT_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC
+ : volumeScannerTimeOutFromConf;
MiniDFSCluster.Builder bld = new MiniDFSCluster.Builder(conf, basedir).
numDataNodes(1).
storagesPerDatanode(1);
+ // verify that the builder was initialized to get the default
+ // configuration designated for Junit tests.
+ assertEquals(expectedVScannerTimeOut,
+ conf.getLong(DFS_BLOCK_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC_KEY, -1));
if (numNameServices > 1) {
bld.nnTopology(MiniDFSNNTopology.
simpleFederatedTopology(numNameServices));
@@ -1012,4 +1024,134 @@ public class TestBlockScanner {
0, info.blocksScanned);
ctx.close();
}
+
+ /**
+ * Test a DN does not wait for the VolumeScanners to finish before shutting
+ * down.
+ *
+ * @throws Exception
+ */
+ @Test(timeout=120000)
+ public void testFastDatanodeShutdown() throws Exception {
+ // set the joinTimeOut to a value smaller than the completion time of the
+ // VolumeScanner.
+ testDatanodeShutDown(50L, 1000L, true);
+ }
+
+ /**
+ * Test a DN waits for the VolumeScanners to finish before shutting down.
+ *
+ * @throws Exception
+ */
+ @Test(timeout=120000)
+ public void testSlowDatanodeShutdown() throws Exception {
+ // Set the joinTimeOut to a value larger than the completion time of the
+ // volume scanner
+ testDatanodeShutDown(TimeUnit.MINUTES.toMillis(5), 1000L,
+ false);
+ }
+
+ private void testDatanodeShutDown(final long joinTimeOutMS,
+ final long delayMS, boolean isFastShutdown) throws Exception {
+ VolumeScannerCBInjector prevVolumeScannerCBInject =
+ VolumeScannerCBInjector.get();
+ try {
+ DelayVolumeScannerResponseToInterrupt injectDelay =
+ new DelayVolumeScannerResponseToInterrupt(delayMS);
+ VolumeScannerCBInjector.set(injectDelay);
+ Configuration conf = new Configuration();
+ conf.setLong(DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, 100L);
+ conf.set(INTERNAL_VOLUME_SCANNER_SCAN_RESULT_HANDLER,
+ TestScanResultHandler.class.getName());
+ conf.setLong(INTERNAL_DFS_BLOCK_SCANNER_CURSOR_SAVE_INTERVAL_MS, 0L);
+ conf.setLong(DFS_BLOCK_SCANNER_VOLUME_JOIN_TIMEOUT_MSEC_KEY,
+ joinTimeOutMS);
+ final TestContext ctx = new TestContext(conf, 1);
+ final int numExpectedBlocks = 10;
+ ctx.createFiles(0, numExpectedBlocks, 1);
+ final TestScanResultHandler.Info info =
+ TestScanResultHandler.getInfo(ctx.volumes.get(0));
+ synchronized (info) {
+ info.sem = new Semaphore(5);
+ info.shouldRun = true;
+ info.notify();
+ }
+ // make sure that the scanners are doing progress
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
+ @Override
+ public Boolean get() {
+ synchronized (info) {
+ return info.blocksScanned >= 1;
+ }
+ }
+ }, 3, 30000);
+ // mark the time where the
+ long startShutdownTime = Time.monotonicNow();
+ ctx.datanode.shutdown();
+ long endShutdownTime = Time.monotonicNow();
+ long totalTimeShutdown = endShutdownTime - startShutdownTime;
+
+ if (isFastShutdown) {
+ assertTrue("total shutdown time of DN must be smaller than "
+ + "VolumeScanner Response time: " + totalTimeShutdown,
+ totalTimeShutdown < delayMS
+ && totalTimeShutdown >= joinTimeOutMS);
+ // wait for scanners to terminate before we move to the next test.
+ injectDelay.waitForScanners();
+ return;
+ }
+ assertTrue("total shutdown time of DN must be larger than " +
+ "VolumeScanner Response time: " + totalTimeShutdown,
+ totalTimeShutdown >= delayMS
+ && totalTimeShutdown < joinTimeOutMS);
+ } finally {
+ // restore the VolumeScanner callback injector.
+ VolumeScannerCBInjector.set(prevVolumeScannerCBInject);
+ }
+ }
+
+ private static class DelayVolumeScannerResponseToInterrupt extends
+ VolumeScannerCBInjector {
+ final private long delayAmountNS;
+ final private Set<VolumeScanner> scannersToShutDown;
+
+ DelayVolumeScannerResponseToInterrupt(long delayMS) {
+ delayAmountNS =
+ TimeUnit.NANOSECONDS.convert(delayMS, TimeUnit.MILLISECONDS);
+ scannersToShutDown = ConcurrentHashMap.newKeySet();
+ }
+
+ @Override
+ public void preSavingBlockIteratorTask(VolumeScanner volumeScanner) {
+ long remainingTimeNS = delayAmountNS;
+ // busy delay without sleep().
+ long startTime = Time.monotonicNowNanos();
+ long endTime = startTime + remainingTimeNS;
+ long currTime, waitTime = 0;
+ while ((currTime = Time.monotonicNowNanos()) < endTime) {
+ // empty loop. No need to sleep because the thread could be in an
+ // interrupt mode.
+ waitTime = currTime - startTime;
+ }
+ LOG.info("VolumeScanner {} finished delayed Task after {}",
+ volumeScanner.toString(),
+ TimeUnit.NANOSECONDS.convert(waitTime, TimeUnit.MILLISECONDS));
+ }
+
+ @Override
+ public void shutdownCallBack(VolumeScanner volumeScanner) {
+ scannersToShutDown.add(volumeScanner);
+ }
+
+ @Override
+ public void terminationCallBack(VolumeScanner volumeScanner) {
+ scannersToShutDown.remove(volumeScanner);
+ }
+
+ public void waitForScanners() throws TimeoutException,
+ InterruptedException {
+ GenericTestUtils.waitFor(
+ () -> scannersToShutDown.isEmpty(), 10, 120000);
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org