You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ki...@apache.org on 2020/02/27 15:45:45 UTC
[hadoop] branch branch-3.1 updated: HDFS-15147. LazyPersistTestCase
wait logic is error-prone. Contributed by Ahmed Hussein.
This is an automated email from the ASF dual-hosted git repository.
kihwal pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new b92477c HDFS-15147. LazyPersistTestCase wait logic is error-prone. Contributed by Ahmed Hussein.
b92477c is described below
commit b92477c638c9b9235868bfd13518e36545139c90
Author: Kihwal Lee <ki...@apache.org>
AuthorDate: Thu Feb 27 09:45:12 2020 -0600
HDFS-15147. LazyPersistTestCase wait logic is error-prone. Contributed
by Ahmed Hussein.
(cherry picked from commit 27cfda708ef66dfbe5f52a5f1e716298a294f3f7)
Conflicts:
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyPersistTestCase.java
---
.../java/org/apache/hadoop/util/ThreadUtil.java | 28 ++
.../org/apache/hadoop/test/GenericTestUtils.java | 15 +-
.../hdfs/server/blockmanagement/BlockManager.java | 21 +-
.../hadoop/hdfs/server/namenode/FSNamesystem.java | 31 +-
.../java/org/apache/hadoop/hdfs/DFSTestUtil.java | 14 +-
.../fsdataset/impl/LazyPersistTestCase.java | 356 ++++++++++++++++-----
.../fsdataset/impl/TestLazyPersistFiles.java | 69 ++--
.../impl/TestLazyPersistReplicaPlacement.java | 2 +-
.../datanode/fsdataset/impl/TestLazyWriter.java | 6 +-
9 files changed, 402 insertions(+), 140 deletions(-)
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ThreadUtil.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ThreadUtil.java
index 2cda8a4..f9ea3fc 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ThreadUtil.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ThreadUtil.java
@@ -50,6 +50,34 @@ public class ThreadUtil {
}
/**
+ * Join a thread as uninterruptible.
+ * The call continues to block until the result is available even when the
+ * caller thread is interrupted.
+ * The method will log any {@link InterruptedException} then will re-interrupt
+ * the thread.
+ *
+ * @param toJoin the thread to Join on.
+ */
+ public static void joinUninterruptibly(Thread toJoin) {
+ boolean interrupted = false;
+ try {
+ while (true) {
+ try {
+ toJoin.join();
+ return;
+ } catch (InterruptedException e) {
+ interrupted = true;
+ LOG.warn("interrupted while sleeping", e);
+ }
+ }
+ } finally {
+ if (interrupted) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ /**
* Convenience method that returns a resource as inputstream from the
* classpath.
* <p>
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/GenericTestUtils.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/GenericTestUtils.java
index 5479907..ba5644f 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/GenericTestUtils.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/GenericTestUtils.java
@@ -60,7 +60,6 @@ import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.collect.Sets;
@@ -369,11 +368,15 @@ public abstract class GenericTestUtils {
* time
* @throws InterruptedException if the method is interrupted while waiting
*/
- public static void waitFor(Supplier<Boolean> check, int checkEveryMillis,
- int waitForMillis) throws TimeoutException, InterruptedException {
- Preconditions.checkNotNull(check, ERROR_MISSING_ARGUMENT);
- Preconditions.checkArgument(waitForMillis >= checkEveryMillis,
- ERROR_INVALID_ARGUMENT);
+ public static void waitFor(final Supplier<Boolean> check,
+ final long checkEveryMillis, final long waitForMillis)
+ throws TimeoutException, InterruptedException {
+ if (check == null) {
+ throw new NullPointerException(ERROR_MISSING_ARGUMENT);
+ }
+ if (waitForMillis < checkEveryMillis) {
+ throw new IllegalArgumentException(ERROR_INVALID_ARGUMENT);
+ }
long st = Time.monotonicNow();
boolean result = check.get();
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index 34c6fd1..17d5603 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -48,6 +48,8 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
+
+import java.util.concurrent.atomic.AtomicLong;
import javax.management.ObjectName;
import org.apache.hadoop.HadoopIllegalArgumentException;
@@ -321,7 +323,12 @@ public class BlockManager implements BlockStatsMXBean {
/** Redundancy thread. */
private final Daemon redundancyThread = new Daemon(new RedundancyMonitor());
-
+ /**
+ * Timestamp marking the end time of {@link #redundancyThread}'s full cycle.
+ * This value can be checked by the Junit tests to verify that the
+ * {@link #redundancyThread} has run at least one full iteration.
+ */
+ private final AtomicLong lastRedundancyCycleTS = new AtomicLong(-1);
/** StorageInfoDefragmenter thread. */
private final Daemon storageInfoDefragmenterThread =
new Daemon(new StorageInfoDefragmenter());
@@ -4781,6 +4788,17 @@ public class BlockManager implements BlockStatsMXBean {
}
/**
+ * Used as ad hoc to check the time stamp of the last full cycle of
+ * {@link #redundancyThread}. This is used by the Junit tests to block until
+ * {@link #lastRedundancyCycleTS} is updated.
+ * @return the current {@link #lastRedundancyCycleTS}.
+ */
+ @VisibleForTesting
+ public long getLastRedundancyMonitorTS() {
+ return lastRedundancyCycleTS.get();
+ }
+
+ /**
* Periodically calls computeBlockRecoveryWork().
*/
private class RedundancyMonitor implements Runnable {
@@ -4794,6 +4812,7 @@ public class BlockManager implements BlockStatsMXBean {
computeDatanodeWork();
processPendingReconstructions();
rescanPostponedMisreplicatedBlocks();
+ lastRedundancyCycleTS.set(Time.monotonicNow());
}
TimeUnit.MILLISECONDS.sleep(redundancyRecheckIntervalMs);
} catch (Throwable t) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index 474da2f..59a380c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -89,6 +89,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SNAPSHOT_DIFF_LISTING_LIMIT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SNAPSHOT_DIFF_LISTING_LIMIT_DEFAULT;
+
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import static org.apache.hadoop.hdfs.server.namenode.FSDirStatAndListingOp.*;
import static org.apache.hadoop.ha.HAServiceProtocol.HAServiceState.ACTIVE;
@@ -113,7 +115,6 @@ import java.io.DataInput;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
@@ -308,6 +309,7 @@ import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.VersionInfo;
import org.apache.log4j.Appender;
import org.apache.log4j.AsyncAppender;
@@ -486,7 +488,12 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
// A daemon to periodically clean up corrupt lazyPersist files
// from the name space.
Daemon lazyPersistFileScrubber = null;
-
+ /**
+ * Timestamp marking the end time of {@link #lazyPersistFileScrubber}'s full
+ * cycle. This value can be checked by the Junit tests to verify that the
+ * {@link #lazyPersistFileScrubber} has run at least one full iteration.
+ */
+ private final AtomicLong lazyPersistFileScrubberTS = new AtomicLong(0);
// Executor to warm up EDEK cache
private ExecutorService edekCacheLoader = null;
private final int edekCacheLoaderDelay;
@@ -645,6 +652,20 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
return leaseManager;
}
+ /**
+ * Used as ad hoc to check the time stamp of the last full cycle of {@link
+ * #lazyPersistFileScrubber} daemon. This is used by the Junit tests to block
+ * until {@link #lazyPersistFileScrubberTS} is updated.
+ *
+ * @return the current {@link #lazyPersistFileScrubberTS} if {@link
+ * #lazyPersistFileScrubber} is not null.
+ */
+ @VisibleForTesting
+ public long getLazyPersistFileScrubberTS() {
+ return lazyPersistFileScrubber == null ? -1
+ : lazyPersistFileScrubberTS.get();
+ }
+
public boolean isHaEnabled() {
return haEnabled;
}
@@ -4116,10 +4137,12 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
try {
if (!isInSafeMode()) {
clearCorruptLazyPersistFiles();
+ // set the timeStamp of last Cycle.
+ lazyPersistFileScrubberTS.set(Time.monotonicNow());
} else {
if (FSNamesystem.LOG.isDebugEnabled()) {
- FSNamesystem.LOG
- .debug("Namenode is in safemode, skipping scrubbing of corrupted lazy-persist files.");
+ FSNamesystem.LOG.debug("Namenode is in safemode, skipping "
+ + "scrubbing of corrupted lazy-persist files.");
}
}
} catch (Exception e) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
index 3d0d882..2e26e5f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
@@ -2000,6 +2000,15 @@ public class DFSTestUtil {
GenericTestUtils.setLogLevel(NameNode.blockStateChangeLog, level);
}
+ public static void setNameNodeLogLevel(org.slf4j.event.Level level) {
+ GenericTestUtils.setLogLevel(FSNamesystem.LOG, level);
+ GenericTestUtils.setLogLevel(BlockManager.LOG, level);
+ GenericTestUtils.setLogLevel(LeaseManager.LOG, level);
+ GenericTestUtils.setLogLevel(NameNode.LOG, level);
+ GenericTestUtils.setLogLevel(NameNode.stateChangeLog, level);
+ GenericTestUtils.setLogLevel(NameNode.blockStateChangeLog, level);
+ }
+
/**
* Get the NamenodeProtocol RPC proxy for the NN associated with this
* DFSClient object
@@ -2282,15 +2291,12 @@ public class DFSTestUtil {
public Boolean get() {
try {
final int currentValue = Integer.parseInt(jmx.getValue(metricName));
- LOG.info("Waiting for " + metricName +
- " to reach value " + expectedValue +
- ", current value = " + currentValue);
return currentValue == expectedValue;
} catch (Exception e) {
throw new UnhandledException("Test failed due to unexpected exception", e);
}
}
- }, 1000, 60000);
+ }, 50, 60000);
}
/**
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyPersistTestCase.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyPersistTestCase.java
index aae59dd..52fabf3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyPersistTestCase.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyPersistTestCase.java
@@ -17,7 +17,11 @@
*/
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
-import com.google.common.base.Supplier;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import static org.apache.hadoop.fs.CreateFlag.CREATE;
@@ -45,6 +49,14 @@ import com.google.common.base.Preconditions;
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataOutputStream;
@@ -68,10 +80,13 @@ import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.net.unix.TemporarySocketDirectory;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.test.GenericTestUtils;
-import org.apache.log4j.Level;
import org.junit.After;
import org.junit.Rule;
import org.junit.rules.Timeout;
+import org.slf4j.event.Level;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
public abstract class LazyPersistTestCase {
static final byte LAZY_PERSIST_POLICY_ID = (byte) 15;
@@ -81,16 +96,33 @@ public abstract class LazyPersistTestCase {
GenericTestUtils.setLogLevel(FsDatasetImpl.LOG, Level.DEBUG);
}
+ protected static final Logger LOG =
+ LoggerFactory.getLogger(LazyPersistTestCase.class);
protected static final int BLOCK_SIZE = 5 * 1024 * 1024;
protected static final int BUFFER_LENGTH = 4096;
- private static final long HEARTBEAT_INTERVAL_SEC = 1;
- private static final int HEARTBEAT_RECHECK_INTERVAL_MSEC = 500;
- private static final String JMX_RAM_DISK_METRICS_PATTERN = "^RamDisk";
- private static final String JMX_SERVICE_NAME = "DataNode";
protected static final int LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC = 3;
protected static final int LAZY_WRITER_INTERVAL_SEC = 1;
- protected static final Log LOG = LogFactory.getLog(LazyPersistTestCase.class);
protected static final short REPL_FACTOR = 1;
+ private static final String JMX_RAM_DISK_METRICS_PATTERN = "^RamDisk";
+ private static final String JMX_SERVICE_NAME = "DataNode";
+ private static final long HEARTBEAT_INTERVAL_SEC = 1;
+ private static final int HEARTBEAT_RECHECK_INTERVAL_MS = 500;
+ private static final long WAIT_FOR_FBR_MS =
+ TimeUnit.SECONDS.toMillis(10);
+ private static final long WAIT_FOR_STORAGE_TYPES_MS =
+ TimeUnit.SECONDS.toMillis(30);
+ private static final long WAIT_FOR_ASYNC_DELETE_MS =
+ TimeUnit.SECONDS.toMillis(10);
+ private static final long WAIT_FOR_DN_SHUTDOWN_MS =
+ TimeUnit.SECONDS.toMillis(30);
+ private static final long WAIT_FOR_REDUNDANCY_MS =
+ TimeUnit.SECONDS
+ .toMillis(2 * DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_DEFAULT);
+ private static final long WAIT_FOR_LAZY_SCRUBBER_MS =
+ TimeUnit.SECONDS.toMillis(2 * LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC);
+ private static final long WAIT_POLL_INTERVAL_MS = 10;
+ private static final long WAIT_POLL_INTERVAL_LARGE_MS = 20;
+
protected final long osPageSize =
NativeIO.POSIX.getCacheManipulator().getOperatingSystemPageSize();
@@ -133,76 +165,79 @@ public abstract class LazyPersistTestCase {
Path path, StorageType storageType)
throws IOException, TimeoutException, InterruptedException {
// Ensure that returned block locations returned are correct!
- LOG.info("Ensure path: " + path + " is on StorageType: " + storageType);
+ LOG.info("Ensure path: {} is on StorageType: {}", path, storageType);
assertThat(fs.exists(path), is(true));
long fileLength = client.getFileInfo(path.toString()).getLen();
- GenericTestUtils.waitFor(new Supplier<Boolean>() {
- @Override
- public Boolean get() {
- try {
- LocatedBlocks locatedBlocks =
- client.getLocatedBlocks(path.toString(), 0, fileLength);
- for (LocatedBlock locatedBlock : locatedBlocks.getLocatedBlocks()) {
- if (locatedBlock.getStorageTypes()[0] != storageType) {
- return false;
- }
+ GenericTestUtils.waitFor(() -> {
+ try {
+ LocatedBlocks locatedBlocks =
+ client.getLocatedBlocks(path.toString(), 0, fileLength);
+ for (LocatedBlock locatedBlock : locatedBlocks.getLocatedBlocks()) {
+ if (locatedBlock.getStorageTypes()[0] != storageType) {
+ return false;
}
- return true;
- } catch (IOException ioe) {
- LOG.warn("Exception got in ensureFileReplicasOnStorageType()", ioe);
- return false;
}
+ return true;
+ } catch (IOException ioe) {
+ LOG.warn("Exception got in ensureFileReplicasOnStorageType()", ioe);
+ return false;
}
- }, 100, 30 * 1000);
+ }, WAIT_POLL_INTERVAL_MS, WAIT_FOR_STORAGE_TYPES_MS);
return client.getLocatedBlocks(path.toString(), 0, fileLength);
}
/**
- * Make sure at least one non-transient volume has a saved copy of the replica.
- * An infinite loop is used to ensure the async lazy persist tasks are completely
- * done before verification. Caller of ensureLazyPersistBlocksAreSaved expects
- * either a successful pass or timeout failure.
+ * Make sure at least one non-transient volume has a saved copy of the
+ * replica. An infinite loop is used to ensure the async lazy persist tasks
+ * are completely done before verification.
+ * Caller of this method expects either a successful pass or timeout failure.
+ *
+ * @param locatedBlocks the collection of blocks and their locations.
+ * @throws IOException for aut-closeable resources.
+ * @throws InterruptedException if the thread is interrupted.
+ * @throws TimeoutException if {@link #WAIT_FOR_STORAGE_TYPES_MS} expires
+ * before we find a persisted copy for each located
+ * block.
*/
protected final void ensureLazyPersistBlocksAreSaved(
- LocatedBlocks locatedBlocks) throws IOException, InterruptedException {
+ final LocatedBlocks locatedBlocks)
+ throws IOException, InterruptedException, TimeoutException {
final String bpid = cluster.getNamesystem().getBlockPoolId();
final Set<Long> persistedBlockIds = new HashSet<Long>();
-
+ // We should find a persisted copy for each located block.
try (FsDatasetSpi.FsVolumeReferences volumes =
cluster.getDataNodes().get(0).getFSDataset().getFsVolumeReferences()) {
- while (persistedBlockIds.size() < locatedBlocks.getLocatedBlocks()
- .size()) {
- // Take 1 second sleep before each verification iteration
- Thread.sleep(1000);
-
+ GenericTestUtils.waitFor(() -> {
for (LocatedBlock lb : locatedBlocks.getLocatedBlocks()) {
for (FsVolumeSpi v : volumes) {
if (v.isTransientStorage()) {
continue;
}
-
FsVolumeImpl volume = (FsVolumeImpl) v;
- File lazyPersistDir =
- volume.getBlockPoolSlice(bpid).getLazypersistDir();
-
+ File lazyPersistDir;
+ try {
+ lazyPersistDir =
+ volume.getBlockPoolSlice(bpid).getLazypersistDir();
+ } catch (IOException ioe) {
+ return false;
+ }
long blockId = lb.getBlock().getBlockId();
File targetDir =
DatanodeUtil.idToBlockDir(lazyPersistDir, blockId);
File blockFile = new File(targetDir, lb.getBlock().getBlockName());
if (blockFile.exists()) {
- // Found a persisted copy for this block and added to the Set
+ // Found a persisted copy for this block and added to the Set.
persistedBlockIds.add(blockId);
}
}
}
- }
+ return (persistedBlockIds.size() ==
+ locatedBlocks.getLocatedBlocks().size());
+ }, WAIT_POLL_INTERVAL_LARGE_MS, WAIT_FOR_STORAGE_TYPES_MS);
}
-
- // We should have found a persisted copy for each located block.
- assertThat(persistedBlockIds.size(), is(locatedBlocks.getLocatedBlocks().size()));
}
protected final void makeRandomTestFile(Path path, long length,
@@ -271,7 +306,7 @@ public abstract class LazyPersistTestCase {
}
conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, HEARTBEAT_INTERVAL_SEC);
conf.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
- HEARTBEAT_RECHECK_INTERVAL_MSEC);
+ HEARTBEAT_RECHECK_INTERVAL_MS);
conf.setInt(DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC,
LAZY_WRITER_INTERVAL_SEC);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_SAFEMODE_MIN_DATANODES_KEY, 1);
@@ -334,18 +369,18 @@ public abstract class LazyPersistTestCase {
@Override
public void mlock(String identifier,
ByteBuffer mmap, long length) throws IOException {
- LOG.info("LazyPersistTestCase: faking mlock of " + identifier + " bytes.");
+ LOG.info("LazyPersistTestCase: faking mlock of {} bytes.", identifier);
}
@Override
public long getMemlockLimit() {
- LOG.info("LazyPersistTestCase: fake return " + Long.MAX_VALUE);
+ LOG.info("LazyPersistTestCase: fake return {}", Long.MAX_VALUE);
return Long.MAX_VALUE;
}
@Override
public boolean verifyCanMlock() {
- LOG.info("LazyPersistTestCase: fake return " + true);
+ LOG.info("LazyPersistTestCase: fake return {}", true);
return true;
}
});
@@ -413,8 +448,10 @@ public abstract class LazyPersistTestCase {
public void build() throws IOException {
LazyPersistTestCase.this.startUpCluster(
- numDatanodes, hasTransientStorage, storageTypes, ramDiskReplicaCapacity,
- ramDiskStorageLimit, maxLockedMemory, useScr, useLegacyBlockReaderLocal,
+ numDatanodes, hasTransientStorage, storageTypes,
+ ramDiskReplicaCapacity,
+ ramDiskStorageLimit, maxLockedMemory, useScr,
+ useLegacyBlockReaderLocal,
disableScrubber);
}
@@ -429,11 +466,44 @@ public abstract class LazyPersistTestCase {
private boolean disableScrubber=false;
}
+ /**
+ * Forces a full blockreport on all the datatanodes. The call blocks waiting
+ * for all blockreports to be received by the namenode.
+ *
+ * @throws IOException if an exception is thrown while getting the datanode
+ * descriptors or triggering the blockreports.
+ * @throws InterruptedException if the thread receives an interrupt.
+ * @throws TimeoutException if the reports are not received by
+ * {@link #WAIT_FOR_FBR_MS}.
+ */
protected final void triggerBlockReport()
- throws IOException, InterruptedException {
+ throws InterruptedException, TimeoutException, IOException {
// Trigger block report to NN
- DataNodeTestUtils.triggerBlockReport(cluster.getDataNodes().get(0));
- Thread.sleep(10 * 1000);
+ final Map<DatanodeStorageInfo, Integer> reportCountsBefore =
+ new HashMap<>();
+ final FSNamesystem fsn = cluster.getNamesystem();
+ for (DataNode dn : cluster.getDataNodes()) {
+ final DatanodeDescriptor dnd =
+ NameNodeAdapter.getDatanode(fsn, dn.getDatanodeId());
+ final DatanodeStorageInfo storage = dnd.getStorageInfos()[0];
+ reportCountsBefore.put(storage, storage.getBlockReportCount());
+ DataNodeTestUtils.triggerBlockReport(dn);
+ }
+ // wait for block reports to be received.
+ GenericTestUtils.waitFor(() -> {
+ for (Entry<DatanodeStorageInfo, Integer> reportEntry :
+ reportCountsBefore.entrySet()) {
+ final DatanodeStorageInfo dnStorageInfo = reportEntry.getKey();
+ final int cntBefore = reportEntry.getValue();
+ final int currentCnt = dnStorageInfo.getBlockReportCount();
+ if (cntBefore == currentCnt) {
+ // Same count means no report has been received.
+ return false;
+ }
+ }
+ // If we reach here, then all the block reports have been received.
+ return true;
+ }, WAIT_POLL_INTERVAL_MS, WAIT_FOR_FBR_MS);
}
protected final boolean verifyBlockDeletedFromDir(File dir,
@@ -445,51 +515,58 @@ public abstract class LazyPersistTestCase {
File blockFile = new File(targetDir, lb.getBlock().getBlockName());
if (blockFile.exists()) {
- LOG.warn("blockFile: " + blockFile.getAbsolutePath() +
- " exists after deletion.");
return false;
}
File metaFile = new File(targetDir,
DatanodeUtil.getMetaName(lb.getBlock().getBlockName(),
lb.getBlock().getGenerationStamp()));
if (metaFile.exists()) {
- LOG.warn("metaFile: " + metaFile.getAbsolutePath() +
- " exists after deletion.");
return false;
}
}
return true;
}
- protected final boolean verifyDeletedBlocks(LocatedBlocks locatedBlocks)
- throws IOException, InterruptedException {
+ protected final boolean verifyDeletedBlocks(final LocatedBlocks locatedBlocks)
+ throws Exception {
LOG.info("Verifying replica has no saved copy after deletion.");
triggerBlockReport();
+ final DataNode dn = cluster.getDataNodes().get(0);
- while(
- cluster.getFsDatasetTestUtils(0).getPendingAsyncDeletions()
- > 0L){
- Thread.sleep(1000);
- }
+ GenericTestUtils.waitFor(() -> {
+ for (DataNode dn1 : cluster.getDataNodes()) {
+ if (cluster.getFsDatasetTestUtils(dn1).getPendingAsyncDeletions()
+ > 0) {
+ return false;
+ }
+ }
+ return true;
+ }, WAIT_POLL_INTERVAL_MS, WAIT_FOR_ASYNC_DELETE_MS);
final String bpid = cluster.getNamesystem().getBlockPoolId();
- final FsDatasetSpi<?> dataset =
- cluster.getDataNodes().get(0).getFSDataset();
-
+ final FsDatasetSpi<?> dataset = dn.getFSDataset();
// Make sure deleted replica does not have a copy on either finalized dir of
- // transient volume or finalized dir of non-transient volume
+ // transient volume or finalized dir of non-transient volume.
+ // We need to wait until the asyn deletion is scheduled.
try (FsDatasetSpi.FsVolumeReferences volumes =
dataset.getFsVolumeReferences()) {
- for (FsVolumeSpi vol : volumes) {
- FsVolumeImpl volume = (FsVolumeImpl) vol;
- File targetDir = (volume.isTransientStorage()) ?
- volume.getBlockPoolSlice(bpid).getFinalizedDir() :
- volume.getBlockPoolSlice(bpid).getLazypersistDir();
- if (verifyBlockDeletedFromDir(targetDir, locatedBlocks) == false) {
+ GenericTestUtils.waitFor(() -> {
+ try {
+ for (FsVolumeSpi vol : volumes) {
+ FsVolumeImpl volume = (FsVolumeImpl) vol;
+ File targetDir = (volume.isTransientStorage()) ?
+ volume.getBlockPoolSlice(bpid).getFinalizedDir() :
+ volume.getBlockPoolSlice(bpid).getLazypersistDir();
+ if (!verifyBlockDeletedFromDir(targetDir, locatedBlocks)) {
+ return false;
+ }
+ }
+ return true;
+ } catch (IOException ie) {
return false;
}
- }
+ }, WAIT_POLL_INTERVAL_MS, WAIT_FOR_ASYNC_DELETE_MS);
}
return true;
}
@@ -530,8 +607,137 @@ public abstract class LazyPersistTestCase {
DFSTestUtil.waitForMetric(jmx, metricName, expectedValue);
}
- protected void triggerEviction(DataNode dn) {
+ protected void triggerEviction(final DataNode dn) {
FsDatasetImpl fsDataset = (FsDatasetImpl) dn.getFSDataset();
fsDataset.evictLazyPersistBlocks(Long.MAX_VALUE); // Run one eviction cycle.
}
+
+ /**
+ * Shutdown all datanodes in {@link #cluster}. The call blocks for
+ * {@link #WAIT_FOR_DN_SHUTDOWN_MS} until client report has no datanode
+ * labeled as live.
+ *
+ * @throws TimeoutException if {@link #WAIT_FOR_DN_SHUTDOWN_MS} expires with
+ * at least one datanode still alive.
+ * @throws InterruptedException if the thread receives an interrupt.
+ */
+ protected void shutdownDataNodes()
+ throws TimeoutException, InterruptedException {
+ cluster.shutdownDataNodes();
+ GenericTestUtils.waitFor(() -> {
+ try {
+ DatanodeInfo[] info = client.datanodeReport(
+ HdfsConstants.DatanodeReportType.LIVE);
+ return info.length == 0;
+ } catch (IOException e) {
+ return false;
+ }
+ }, WAIT_POLL_INTERVAL_LARGE_MS, WAIT_FOR_DN_SHUTDOWN_MS);
+ }
+
+ /**
+ * Blocks for {@link #WAIT_FOR_REDUNDANCY_MS} waiting for corrupt block count
+ * to reach a certain count.
+ *
+ * @param corruptCnt representing the number of corrupt blocks before
+ * resuming.
+ * @throws TimeoutException if {@link #WAIT_FOR_REDUNDANCY_MS} expires with
+ * corrupt count does not meet the criteria.
+ * @throws InterruptedException if the thread receives an interrupt.
+ */
+ protected void waitForCorruptBlock(final long corruptCnt)
+ throws TimeoutException, InterruptedException {
+ // wait for the redundancy monitor to mark the file as corrupt.
+ GenericTestUtils.waitFor(() -> {
+ Iterator<BlockInfo> bInfoIter = cluster.getNameNode()
+ .getNamesystem().getBlockManager().getCorruptReplicaBlockIterator();
+ int count = 0;
+ while (bInfoIter.hasNext()) {
+ bInfoIter.next();
+ count++;
+ }
+ return corruptCnt == count;
+ }, 2 * WAIT_POLL_INTERVAL_LARGE_MS, WAIT_FOR_REDUNDANCY_MS);
+ }
+
+ /**
+ * Blocks until {@link FSNamesystem#lazyPersistFileScrubber} daemon completes
+ * a full iteration.
+ *
+ * @throws InterruptedException if the thread receives an interrupt.
+ * @throws TimeoutException
+ * {@link FSNamesystem#getLazyPersistFileScrubberTS()}
+ * does not update the timestamp by
+ * {@link #WAIT_FOR_LAZY_SCRUBBER_MS}.
+ */
+ protected void waitForScrubberCycle()
+ throws TimeoutException, InterruptedException {
+ // wait for the redundancy monitor to mark the file as corrupt.
+ final FSNamesystem fsn = cluster.getNamesystem();
+ final long lastTimeStamp = fsn.getLazyPersistFileScrubberTS();
+ if (lastTimeStamp == -1) { // scrubber is disabled
+ return;
+ }
+ GenericTestUtils.waitFor(
+ () -> lastTimeStamp != fsn.getLazyPersistFileScrubberTS(),
+ 2 * WAIT_POLL_INTERVAL_LARGE_MS, WAIT_FOR_LAZY_SCRUBBER_MS);
+ }
+
+ /**
+ * Blocks until {@link BlockManager#RedundancyMonitor} daemon completes
+ * a full iteration.
+ *
+ * @throws InterruptedException if the thread receives an interrupt.
+ * @throws TimeoutException {@link BlockManager#getLastRedundancyMonitorTS()}
+ * does not update the timestamp by
+ * {@link #WAIT_FOR_REDUNDANCY_MS}.
+ */
+ protected void waitForRedundancyMonitorCycle()
+ throws TimeoutException, InterruptedException {
+ // wait for the redundancy monitor to mark the file as corrupt.
+ final BlockManager bm = cluster.getNamesystem().getBlockManager();
+ final long lastRedundancyTS =
+ bm.getLastRedundancyMonitorTS();
+
+ GenericTestUtils.waitFor(
+ () -> lastRedundancyTS != bm.getLastRedundancyMonitorTS(),
+ 2 * WAIT_POLL_INTERVAL_LARGE_MS, WAIT_FOR_REDUNDANCY_MS);
+ }
+
+ /**
+ * Blocks until {@link BlockManager#lowRedundancyBlocksCount} reaches a
+ * certain value.
+ *
+ * @throws InterruptedException if the thread receives an interrupt.
+ * @throws TimeoutException {@link BlockManager#getLowRedundancyBlocksCount()}
+ * does not update the count by
+ * {@link #WAIT_FOR_REDUNDANCY_MS}.
+ */
+ protected void waitForLowRedundancyCount(final long cnt)
+ throws TimeoutException, InterruptedException {
+ final BlockManager bm = cluster.getNamesystem().getBlockManager();
+
+ GenericTestUtils.waitFor(() -> cnt == bm.getLowRedundancyBlocksCount(),
+ 2 * WAIT_POLL_INTERVAL_LARGE_MS, WAIT_FOR_REDUNDANCY_MS);
+ }
+
+ /**
+ * Blocks until the file status changes on the filesystem.
+ *
+ * @param path of the file to be checked.
+ * @param expected whether a file should exist or not.
+ * @throws TimeoutException if the file status does not meet the expected by
+ * {@link #WAIT_FOR_STORAGE_TYPES_MS}.
+ * @throws InterruptedException if the thread receives an interrupt.
+ */
+ protected void waitForFile(final Path path, final boolean expected)
+ throws TimeoutException, InterruptedException {
+ GenericTestUtils.waitFor(() -> {
+ try {
+ return expected == fs.exists(path);
+ } catch (IOException e) {
+ return false;
+ }
+ }, WAIT_POLL_INTERVAL_MS, WAIT_FOR_STORAGE_TYPES_MS);
+ }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java
index 04f8127..c0b4b17 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java
@@ -16,11 +16,10 @@
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
-import com.google.common.collect.Iterators;
-import com.google.common.util.concurrent.Uninterruptibles;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.util.ThreadUtil;
import org.junit.Assert;
import org.junit.Test;
@@ -33,7 +32,6 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.apache.hadoop.fs.StorageType.RAM_DISK;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertThat;
@@ -76,7 +74,7 @@ public class TestLazyPersistFiles extends LazyPersistTestCase {
makeTestFile(path, BLOCK_SIZE, true);
try {
- client.truncate(path.toString(), BLOCK_SIZE/2);
+ client.truncate(path.toString(), BLOCK_SIZE / 2);
fail("Truncate to LazyPersist file did not fail as expected");
} catch (Throwable t) {
LOG.info("Got expected exception ", t);
@@ -98,28 +96,20 @@ public class TestLazyPersistFiles extends LazyPersistTestCase {
makeTestFile(path1, BLOCK_SIZE, true);
ensureFileReplicasOnStorageType(path1, RAM_DISK);
- // Stop the DataNode and sleep for the time it takes the NN to
- // detect the DN as being dead.
- cluster.shutdownDataNodes();
- Thread.sleep(30000L);
+ // Stop the DataNode.
+ shutdownDataNodes();
assertThat(cluster.getNamesystem().getNumDeadDataNodes(), is(1));
// Next, wait for the redundancy monitor to mark the file as corrupt.
- Thread.sleep(2 * DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_DEFAULT * 1000);
-
+ waitForRedundancyMonitorCycle();
// Wait for the LazyPersistFileScrubber to run
- Thread.sleep(2 * LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC * 1000);
-
+ waitForScrubberCycle();
// Ensure that path1 does not exist anymore, whereas path2 does.
- assert(!fs.exists(path1));
+ waitForFile(path1, false);
// We should have zero blocks that needs replication i.e. the one
- // belonging to path2.
- assertThat(cluster.getNameNode()
- .getNamesystem()
- .getBlockManager()
- .getLowRedundancyBlocksCount(),
- is(0L));
+ // belonging to path2. This needs a wait.
+ waitForLowRedundancyCount(0L);
}
@Test
@@ -134,18 +124,14 @@ public class TestLazyPersistFiles extends LazyPersistTestCase {
// Stop the DataNode and sleep for the time it takes the NN to
// detect the DN as being dead.
- cluster.shutdownDataNodes();
- Thread.sleep(30000L);
-
- // Next, wait for the redundancy monitor to mark the file as corrupt.
- Thread.sleep(2 * DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_DEFAULT * 1000);
+ shutdownDataNodes();
+ // wait for the redundancy monitor to mark the file as corrupt.
+ waitForCorruptBlock(1L);
// Wait for the LazyPersistFileScrubber to run
- Thread.sleep(2 * LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC * 1000);
-
+ waitForScrubberCycle();
// Ensure that path1 exist.
- Assert.assertTrue(fs.exists(path1));
-
+ waitForFile(path1, true);
}
/**
@@ -160,20 +146,14 @@ public class TestLazyPersistFiles extends LazyPersistTestCase {
makeTestFile(path1, BLOCK_SIZE, true);
ensureFileReplicasOnStorageType(path1, RAM_DISK);
- cluster.shutdownDataNodes();
+ shutdownDataNodes();
cluster.restartNameNodes();
// wait for the redundancy monitor to mark the file as corrupt.
- Long corruptBlkCount;
- do {
- Thread.sleep(DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_DEFAULT * 1000);
- corruptBlkCount = (long) Iterators.size(cluster.getNameNode()
- .getNamesystem().getBlockManager().getCorruptReplicaBlockIterator());
- } while (corruptBlkCount != 1L);
-
+ waitForCorruptBlock(1L);
// Ensure path1 exist.
- Assert.assertTrue(fs.exists(path1));
+ waitForFile(path1, true);
}
/**
@@ -215,10 +195,8 @@ public class TestLazyPersistFiles extends LazyPersistTestCase {
threads[i].start();
}
- Thread.sleep(500);
-
for (int i = 0; i < NUM_TASKS; i++) {
- Uninterruptibles.joinUninterruptibly(threads[i]);
+ ThreadUtil.joinUninterruptibly(threads[i]);
}
Assert.assertFalse(testFailed.get());
}
@@ -232,7 +210,7 @@ public class TestLazyPersistFiles extends LazyPersistTestCase {
*/
@Test
public void testConcurrentWrites()
- throws IOException, InterruptedException {
+ throws IOException, InterruptedException, TimeoutException {
getClusterBuilder().setRamDiskReplicaCapacity(9).build();
final String METHOD_NAME = GenericTestUtils.getMethodName();
final int SEED = 0xFADED;
@@ -281,11 +259,11 @@ public class TestLazyPersistFiles extends LazyPersistTestCase {
this.seed = seed;
this.latch = latch;
this.bFail = bFail;
- System.out.println("Creating Writer: " + id);
+ LOG.info("Creating Writer: {}", id);
}
public void run() {
- System.out.println("Writer " + id + " starting... ");
+ LOG.info("Writer {} starting... ", id);
int i = 0;
try {
for (i = 0; i < paths.length; i++) {
@@ -295,9 +273,8 @@ public class TestLazyPersistFiles extends LazyPersistTestCase {
}
} catch (IOException e) {
bFail.set(true);
- LOG.error("Writer exception: writer id:" + id +
- " testfile: " + paths[i].toString() +
- " " + e);
+ LOG.error("Writer exception: writer id:{} testfile: {}",
+ id, paths[i].toString(), e);
} finally {
latch.countDown();
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistReplicaPlacement.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistReplicaPlacement.java
index c16dbe5..b6413ec 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistReplicaPlacement.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistReplicaPlacement.java
@@ -119,7 +119,7 @@ public class TestLazyPersistReplicaPlacement extends LazyPersistTestCase {
*/
@Test
public void testFallbackToDiskPartial()
- throws IOException, InterruptedException {
+ throws IOException, InterruptedException, TimeoutException {
getClusterBuilder().setMaxLockedMemory(2 * BLOCK_SIZE).build();
final String METHOD_NAME = GenericTestUtils.getMethodName();
Path path = new Path("/" + METHOD_NAME + ".dat");
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyWriter.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyWriter.java
index 1680764..56cc41e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyWriter.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyWriter.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Assert;
import org.junit.Test;
@@ -156,7 +157,6 @@ public class TestLazyWriter extends LazyPersistTestCase {
for (int i = 0; i < NUM_PATHS; ++i) {
makeTestFile(paths[i + NUM_PATHS], BLOCK_SIZE, true);
triggerBlockReport();
- Thread.sleep(3000);
ensureFileReplicasOnStorageType(paths[i + NUM_PATHS], RAM_DISK);
ensureFileReplicasOnStorageType(paths[indexes.get(i)], DEFAULT);
for (int j = i + 1; j < NUM_PATHS; ++j) {
@@ -183,13 +183,13 @@ public class TestLazyWriter extends LazyPersistTestCase {
throws Exception {
getClusterBuilder().build();
final String METHOD_NAME = GenericTestUtils.getMethodName();
- FsDatasetTestUtil.stopLazyWriter(cluster.getDataNodes().get(0));
+ final DataNode dn = cluster.getDataNodes().get(0);
+ FsDatasetTestUtil.stopLazyWriter(dn);
Path path = new Path("/" + METHOD_NAME + ".dat");
makeTestFile(path, BLOCK_SIZE, true);
LocatedBlocks locatedBlocks =
ensureFileReplicasOnStorageType(path, RAM_DISK);
-
// Delete before persist
client.delete(path.toString(), false);
Assert.assertFalse(fs.exists(path));
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org