You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ki...@apache.org on 2020/02/27 15:45:45 UTC

[hadoop] branch branch-3.1 updated: HDFS-15147. LazyPersistTestCase wait logic is error-prone. Contributed by Ahmed Hussein.

This is an automated email from the ASF dual-hosted git repository.

kihwal pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new b92477c  HDFS-15147. LazyPersistTestCase wait logic is error-prone. Contributed by Ahmed Hussein.
b92477c is described below

commit b92477c638c9b9235868bfd13518e36545139c90
Author: Kihwal Lee <ki...@apache.org>
AuthorDate: Thu Feb 27 09:45:12 2020 -0600

    HDFS-15147. LazyPersistTestCase wait logic is error-prone. Contributed
    by Ahmed Hussein.
    
    (cherry picked from commit 27cfda708ef66dfbe5f52a5f1e716298a294f3f7)
    
    Conflicts:
    	hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
    	hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
    	hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyPersistTestCase.java
---
 .../java/org/apache/hadoop/util/ThreadUtil.java    |  28 ++
 .../org/apache/hadoop/test/GenericTestUtils.java   |  15 +-
 .../hdfs/server/blockmanagement/BlockManager.java  |  21 +-
 .../hadoop/hdfs/server/namenode/FSNamesystem.java  |  31 +-
 .../java/org/apache/hadoop/hdfs/DFSTestUtil.java   |  14 +-
 .../fsdataset/impl/LazyPersistTestCase.java        | 356 ++++++++++++++++-----
 .../fsdataset/impl/TestLazyPersistFiles.java       |  69 ++--
 .../impl/TestLazyPersistReplicaPlacement.java      |   2 +-
 .../datanode/fsdataset/impl/TestLazyWriter.java    |   6 +-
 9 files changed, 402 insertions(+), 140 deletions(-)

diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ThreadUtil.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ThreadUtil.java
index 2cda8a4..f9ea3fc 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ThreadUtil.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ThreadUtil.java
@@ -50,6 +50,34 @@ public class ThreadUtil {
   }
 
   /**
+   * Join a thread as uninterruptible.
+   * The call continues to block until the result is available even when the
+   * caller thread is interrupted.
+   * The method will log any {@link InterruptedException} then will re-interrupt
+   * the thread.
+   *
+   * @param toJoin the thread to Join on.
+   */
+  public static void joinUninterruptibly(Thread toJoin) {
+    boolean interrupted = false;
+    try {
+      while (true) {
+        try {
+          toJoin.join();
+          return;
+        } catch (InterruptedException e) {
+          interrupted = true;
+          LOG.warn("interrupted while sleeping", e);
+        }
+      }
+    } finally {
+      if (interrupted) {
+        Thread.currentThread().interrupt();
+      }
+    }
+  }
+
+  /**
    * Convenience method that returns a resource as inputstream from the
    * classpath.
    * <p>
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/GenericTestUtils.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/GenericTestUtils.java
index 5479907..ba5644f 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/GenericTestUtils.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/GenericTestUtils.java
@@ -60,7 +60,6 @@ import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
 import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
 import com.google.common.base.Supplier;
 import com.google.common.collect.Sets;
 
@@ -369,11 +368,15 @@ public abstract class GenericTestUtils {
    * time
    * @throws InterruptedException if the method is interrupted while waiting
    */
-  public static void waitFor(Supplier<Boolean> check, int checkEveryMillis,
-      int waitForMillis) throws TimeoutException, InterruptedException {
-    Preconditions.checkNotNull(check, ERROR_MISSING_ARGUMENT);
-    Preconditions.checkArgument(waitForMillis >= checkEveryMillis,
-        ERROR_INVALID_ARGUMENT);
+  public static void waitFor(final Supplier<Boolean> check,
+      final long checkEveryMillis, final long waitForMillis)
+      throws TimeoutException, InterruptedException {
+    if (check == null) {
+      throw new NullPointerException(ERROR_MISSING_ARGUMENT);
+    }
+    if (waitForMillis < checkEveryMillis) {
+      throw new IllegalArgumentException(ERROR_INVALID_ARGUMENT);
+    }
 
     long st = Time.monotonicNow();
     boolean result = check.get();
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index 34c6fd1..17d5603 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -48,6 +48,8 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.FutureTask;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
+
+import java.util.concurrent.atomic.AtomicLong;
 import javax.management.ObjectName;
 
 import org.apache.hadoop.HadoopIllegalArgumentException;
@@ -321,7 +323,12 @@ public class BlockManager implements BlockStatsMXBean {
 
   /** Redundancy thread. */
   private final Daemon redundancyThread = new Daemon(new RedundancyMonitor());
-
+  /**
+   * Timestamp marking the end time of {@link #redundancyThread}'s full cycle.
+   * This value can be checked by the Junit tests to verify that the
+   * {@link #redundancyThread} has run at least one full iteration.
+   */
+  private final AtomicLong lastRedundancyCycleTS = new AtomicLong(-1);
   /** StorageInfoDefragmenter thread. */
   private final Daemon storageInfoDefragmenterThread =
       new Daemon(new StorageInfoDefragmenter());
@@ -4781,6 +4788,17 @@ public class BlockManager implements BlockStatsMXBean {
   }
 
   /**
+   * Used as ad hoc to check the time stamp of the last full cycle of
+   * {@link #redundancyThread}. This is used by the Junit tests to block until
+   * {@link #lastRedundancyCycleTS} is updated.
+   * @return the current {@link #lastRedundancyCycleTS}.
+   */
+  @VisibleForTesting
+  public long getLastRedundancyMonitorTS() {
+    return lastRedundancyCycleTS.get();
+  }
+
+  /**
    * Periodically calls computeBlockRecoveryWork().
    */
   private class RedundancyMonitor implements Runnable {
@@ -4794,6 +4812,7 @@ public class BlockManager implements BlockStatsMXBean {
             computeDatanodeWork();
             processPendingReconstructions();
             rescanPostponedMisreplicatedBlocks();
+            lastRedundancyCycleTS.set(Time.monotonicNow());
           }
           TimeUnit.MILLISECONDS.sleep(redundancyRecheckIntervalMs);
         } catch (Throwable t) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index 474da2f..59a380c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -89,6 +89,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SNAPSHOT_DIFF_LISTING_LIMIT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SNAPSHOT_DIFF_LISTING_LIMIT_DEFAULT;
+
+import java.util.concurrent.atomic.AtomicLong;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import static org.apache.hadoop.hdfs.server.namenode.FSDirStatAndListingOp.*;
 import static org.apache.hadoop.ha.HAServiceProtocol.HAServiceState.ACTIVE;
@@ -113,7 +115,6 @@ import java.io.DataInput;
 import java.io.DataOutputStream;
 import java.io.File;
 import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.OutputStreamWriter;
 import java.io.PrintWriter;
@@ -308,6 +309,7 @@ import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Time;
 import org.apache.hadoop.util.VersionInfo;
 import org.apache.log4j.Appender;
 import org.apache.log4j.AsyncAppender;
@@ -486,7 +488,12 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
   // A daemon to periodically clean up corrupt lazyPersist files
   // from the name space.
   Daemon lazyPersistFileScrubber = null;
-
+  /**
+   * Timestamp marking the end time of {@link #lazyPersistFileScrubber}'s full
+   * cycle. This value can be checked by the Junit tests to verify that the
+   * {@link #lazyPersistFileScrubber} has run at least one full iteration.
+   */
+  private final AtomicLong lazyPersistFileScrubberTS = new AtomicLong(0);
   // Executor to warm up EDEK cache
   private ExecutorService edekCacheLoader = null;
   private final int edekCacheLoaderDelay;
@@ -645,6 +652,20 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     return leaseManager;
   }
 
+  /**
+   * Used as ad hoc to check the time stamp of the last full cycle of {@link
+   * #lazyPersistFileScrubber} daemon. This is used by the Junit tests to block
+   * until {@link #lazyPersistFileScrubberTS} is updated.
+   *
+   * @return the current {@link #lazyPersistFileScrubberTS} if {@link
+   * #lazyPersistFileScrubber} is not null.
+   */
+  @VisibleForTesting
+  public long getLazyPersistFileScrubberTS() {
+    return lazyPersistFileScrubber == null ? -1
+        : lazyPersistFileScrubberTS.get();
+  }
+
   public boolean isHaEnabled() {
     return haEnabled;
   }
@@ -4116,10 +4137,12 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
         try {
           if (!isInSafeMode()) {
             clearCorruptLazyPersistFiles();
+            // set the timeStamp of last Cycle.
+            lazyPersistFileScrubberTS.set(Time.monotonicNow());
           } else {
             if (FSNamesystem.LOG.isDebugEnabled()) {
-              FSNamesystem.LOG
-                  .debug("Namenode is in safemode, skipping scrubbing of corrupted lazy-persist files.");
+              FSNamesystem.LOG.debug("Namenode is in safemode, skipping "
+                  + "scrubbing of corrupted lazy-persist files.");
             }
           }
         } catch (Exception e) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
index 3d0d882..2e26e5f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
@@ -2000,6 +2000,15 @@ public class DFSTestUtil {
     GenericTestUtils.setLogLevel(NameNode.blockStateChangeLog, level);
   }
 
+  public static void setNameNodeLogLevel(org.slf4j.event.Level level) {
+    GenericTestUtils.setLogLevel(FSNamesystem.LOG, level);
+    GenericTestUtils.setLogLevel(BlockManager.LOG, level);
+    GenericTestUtils.setLogLevel(LeaseManager.LOG, level);
+    GenericTestUtils.setLogLevel(NameNode.LOG, level);
+    GenericTestUtils.setLogLevel(NameNode.stateChangeLog, level);
+    GenericTestUtils.setLogLevel(NameNode.blockStateChangeLog, level);
+  }
+
   /**
    * Get the NamenodeProtocol RPC proxy for the NN associated with this
    * DFSClient object
@@ -2282,15 +2291,12 @@ public class DFSTestUtil {
       public Boolean get() {
         try {
           final int currentValue = Integer.parseInt(jmx.getValue(metricName));
-          LOG.info("Waiting for " + metricName +
-                       " to reach value " + expectedValue +
-                       ", current value = " + currentValue);
           return currentValue == expectedValue;
         } catch (Exception e) {
           throw new UnhandledException("Test failed due to unexpected exception", e);
         }
       }
-    }, 1000, 60000);
+    }, 50, 60000);
   }
 
   /**
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyPersistTestCase.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyPersistTestCase.java
index aae59dd..52fabf3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyPersistTestCase.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyPersistTestCase.java
@@ -17,7 +17,11 @@
  */
 package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
 
-import com.google.common.base.Supplier;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.TimeUnit;
 import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
 
 import static org.apache.hadoop.fs.CreateFlag.CREATE;
@@ -45,6 +49,14 @@ import com.google.common.base.Preconditions;
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -68,10 +80,13 @@ import org.apache.hadoop.io.nativeio.NativeIO;
 import org.apache.hadoop.net.unix.TemporarySocketDirectory;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.test.GenericTestUtils;
-import org.apache.log4j.Level;
 import org.junit.After;
 import org.junit.Rule;
 import org.junit.rules.Timeout;
+import org.slf4j.event.Level;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 
 public abstract class LazyPersistTestCase {
   static final byte LAZY_PERSIST_POLICY_ID = (byte) 15;
@@ -81,16 +96,33 @@ public abstract class LazyPersistTestCase {
     GenericTestUtils.setLogLevel(FsDatasetImpl.LOG, Level.DEBUG);
   }
 
+  protected static final Logger LOG =
+      LoggerFactory.getLogger(LazyPersistTestCase.class);
   protected static final int BLOCK_SIZE = 5 * 1024 * 1024;
   protected static final int BUFFER_LENGTH = 4096;
-  private static final long HEARTBEAT_INTERVAL_SEC = 1;
-  private static final int HEARTBEAT_RECHECK_INTERVAL_MSEC = 500;
-  private static final String JMX_RAM_DISK_METRICS_PATTERN = "^RamDisk";
-  private static final String JMX_SERVICE_NAME = "DataNode";
   protected static final int LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC = 3;
   protected static final int LAZY_WRITER_INTERVAL_SEC = 1;
-  protected static final Log LOG = LogFactory.getLog(LazyPersistTestCase.class);
   protected static final short REPL_FACTOR = 1;
+  private static final String JMX_RAM_DISK_METRICS_PATTERN = "^RamDisk";
+  private static final String JMX_SERVICE_NAME = "DataNode";
+  private static final long HEARTBEAT_INTERVAL_SEC = 1;
+  private static final int HEARTBEAT_RECHECK_INTERVAL_MS = 500;
+  private static final long WAIT_FOR_FBR_MS =
+      TimeUnit.SECONDS.toMillis(10);
+  private static final long WAIT_FOR_STORAGE_TYPES_MS =
+      TimeUnit.SECONDS.toMillis(30);
+  private static final long WAIT_FOR_ASYNC_DELETE_MS =
+      TimeUnit.SECONDS.toMillis(10);
+  private static final long WAIT_FOR_DN_SHUTDOWN_MS =
+      TimeUnit.SECONDS.toMillis(30);
+  private static final long WAIT_FOR_REDUNDANCY_MS =
+      TimeUnit.SECONDS
+          .toMillis(2 * DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_DEFAULT);
+  private static final long WAIT_FOR_LAZY_SCRUBBER_MS =
+      TimeUnit.SECONDS.toMillis(2 * LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC);
+  private static final long WAIT_POLL_INTERVAL_MS = 10;
+  private static final long WAIT_POLL_INTERVAL_LARGE_MS = 20;
+
   protected final long osPageSize =
       NativeIO.POSIX.getCacheManipulator().getOperatingSystemPageSize();
 
@@ -133,76 +165,79 @@ public abstract class LazyPersistTestCase {
       Path path, StorageType storageType)
       throws IOException, TimeoutException, InterruptedException {
     // Ensure that returned block locations returned are correct!
-    LOG.info("Ensure path: " + path + " is on StorageType: " + storageType);
+    LOG.info("Ensure path: {} is on StorageType: {}", path, storageType);
     assertThat(fs.exists(path), is(true));
     long fileLength = client.getFileInfo(path.toString()).getLen();
 
-    GenericTestUtils.waitFor(new Supplier<Boolean>() {
-      @Override
-      public Boolean get() {
-        try {
-          LocatedBlocks locatedBlocks =
-              client.getLocatedBlocks(path.toString(), 0, fileLength);
-          for (LocatedBlock locatedBlock : locatedBlocks.getLocatedBlocks()) {
-            if (locatedBlock.getStorageTypes()[0] != storageType) {
-              return false;
-            }
+    GenericTestUtils.waitFor(() -> {
+      try {
+        LocatedBlocks locatedBlocks =
+            client.getLocatedBlocks(path.toString(), 0, fileLength);
+        for (LocatedBlock locatedBlock : locatedBlocks.getLocatedBlocks()) {
+          if (locatedBlock.getStorageTypes()[0] != storageType) {
+            return false;
           }
-          return true;
-        } catch (IOException ioe) {
-          LOG.warn("Exception got in ensureFileReplicasOnStorageType()", ioe);
-          return false;
         }
+        return true;
+      } catch (IOException ioe) {
+        LOG.warn("Exception got in ensureFileReplicasOnStorageType()", ioe);
+        return false;
       }
-    }, 100, 30 * 1000);
+    }, WAIT_POLL_INTERVAL_MS, WAIT_FOR_STORAGE_TYPES_MS);
 
     return client.getLocatedBlocks(path.toString(), 0, fileLength);
   }
 
   /**
-   * Make sure at least one non-transient volume has a saved copy of the replica.
-   * An infinite loop is used to ensure the async lazy persist tasks are completely
-   * done before verification. Caller of ensureLazyPersistBlocksAreSaved expects
-   * either a successful pass or timeout failure.
+   * Make sure at least one non-transient volume has a saved copy of the
+   * replica. An infinite loop is used to ensure the async lazy persist tasks
+   * are completely done before verification.
+   * Caller of this method expects either a successful pass or timeout failure.
+   *
+   * @param locatedBlocks the collection of blocks and their locations.
+   * @throws IOException for aut-closeable resources.
+   * @throws InterruptedException if the thread is interrupted.
+   * @throws TimeoutException if {@link #WAIT_FOR_STORAGE_TYPES_MS} expires
+   *                          before we find a persisted copy for each located
+   *                          block.
    */
   protected final void ensureLazyPersistBlocksAreSaved(
-      LocatedBlocks locatedBlocks) throws IOException, InterruptedException {
+      final LocatedBlocks locatedBlocks)
+      throws IOException, InterruptedException, TimeoutException {
     final String bpid = cluster.getNamesystem().getBlockPoolId();
 
     final Set<Long> persistedBlockIds = new HashSet<Long>();
-
+    // We should find a persisted copy for each located block.
     try (FsDatasetSpi.FsVolumeReferences volumes =
         cluster.getDataNodes().get(0).getFSDataset().getFsVolumeReferences()) {
-      while (persistedBlockIds.size() < locatedBlocks.getLocatedBlocks()
-          .size()) {
-        // Take 1 second sleep before each verification iteration
-        Thread.sleep(1000);
-
+      GenericTestUtils.waitFor(() -> {
         for (LocatedBlock lb : locatedBlocks.getLocatedBlocks()) {
           for (FsVolumeSpi v : volumes) {
             if (v.isTransientStorage()) {
               continue;
             }
-
             FsVolumeImpl volume = (FsVolumeImpl) v;
-            File lazyPersistDir =
-                volume.getBlockPoolSlice(bpid).getLazypersistDir();
-
+            File lazyPersistDir;
+            try {
+              lazyPersistDir =
+                  volume.getBlockPoolSlice(bpid).getLazypersistDir();
+            } catch (IOException ioe) {
+              return false;
+            }
             long blockId = lb.getBlock().getBlockId();
             File targetDir =
                 DatanodeUtil.idToBlockDir(lazyPersistDir, blockId);
             File blockFile = new File(targetDir, lb.getBlock().getBlockName());
             if (blockFile.exists()) {
-              // Found a persisted copy for this block and added to the Set
+              // Found a persisted copy for this block and added to the Set.
               persistedBlockIds.add(blockId);
             }
           }
         }
-      }
+        return (persistedBlockIds.size() ==
+            locatedBlocks.getLocatedBlocks().size());
+      }, WAIT_POLL_INTERVAL_LARGE_MS, WAIT_FOR_STORAGE_TYPES_MS);
     }
-
-    // We should have found a persisted copy for each located block.
-    assertThat(persistedBlockIds.size(), is(locatedBlocks.getLocatedBlocks().size()));
   }
 
   protected final void makeRandomTestFile(Path path, long length,
@@ -271,7 +306,7 @@ public abstract class LazyPersistTestCase {
     }
     conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, HEARTBEAT_INTERVAL_SEC);
     conf.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
-                HEARTBEAT_RECHECK_INTERVAL_MSEC);
+        HEARTBEAT_RECHECK_INTERVAL_MS);
     conf.setInt(DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC,
                 LAZY_WRITER_INTERVAL_SEC);
     conf.setInt(DFSConfigKeys.DFS_NAMENODE_SAFEMODE_MIN_DATANODES_KEY, 1);
@@ -334,18 +369,18 @@ public abstract class LazyPersistTestCase {
       @Override
       public void mlock(String identifier,
                         ByteBuffer mmap, long length) throws IOException {
-        LOG.info("LazyPersistTestCase: faking mlock of " + identifier + " bytes.");
+        LOG.info("LazyPersistTestCase: faking mlock of {} bytes.", identifier);
       }
 
       @Override
       public long getMemlockLimit() {
-        LOG.info("LazyPersistTestCase: fake return " + Long.MAX_VALUE);
+        LOG.info("LazyPersistTestCase: fake return {}", Long.MAX_VALUE);
         return Long.MAX_VALUE;
       }
 
       @Override
       public boolean verifyCanMlock() {
-        LOG.info("LazyPersistTestCase: fake return " + true);
+        LOG.info("LazyPersistTestCase: fake return {}", true);
         return true;
       }
     });
@@ -413,8 +448,10 @@ public abstract class LazyPersistTestCase {
 
     public void build() throws IOException {
       LazyPersistTestCase.this.startUpCluster(
-          numDatanodes, hasTransientStorage, storageTypes, ramDiskReplicaCapacity,
-          ramDiskStorageLimit, maxLockedMemory, useScr, useLegacyBlockReaderLocal,
+          numDatanodes, hasTransientStorage, storageTypes,
+          ramDiskReplicaCapacity,
+          ramDiskStorageLimit, maxLockedMemory, useScr,
+          useLegacyBlockReaderLocal,
           disableScrubber);
     }
 
@@ -429,11 +466,44 @@ public abstract class LazyPersistTestCase {
     private boolean disableScrubber=false;
   }
 
+  /**
+   * Forces a full blockreport on all the datatanodes. The call blocks waiting
+   * for all blockreports to be received by the namenode.
+   *
+   * @throws IOException if an exception is thrown while getting the datanode
+   *                     descriptors or triggering the blockreports.
+   * @throws InterruptedException if the thread receives an interrupt.
+   * @throws TimeoutException if the reports are not received by
+   *                          {@link #WAIT_FOR_FBR_MS}.
+   */
   protected final void triggerBlockReport()
-      throws IOException, InterruptedException {
+      throws InterruptedException, TimeoutException, IOException {
     // Trigger block report to NN
-    DataNodeTestUtils.triggerBlockReport(cluster.getDataNodes().get(0));
-    Thread.sleep(10 * 1000);
+    final Map<DatanodeStorageInfo, Integer> reportCountsBefore =
+        new HashMap<>();
+    final FSNamesystem fsn = cluster.getNamesystem();
+    for (DataNode dn : cluster.getDataNodes()) {
+      final DatanodeDescriptor dnd =
+          NameNodeAdapter.getDatanode(fsn, dn.getDatanodeId());
+      final DatanodeStorageInfo storage = dnd.getStorageInfos()[0];
+      reportCountsBefore.put(storage, storage.getBlockReportCount());
+      DataNodeTestUtils.triggerBlockReport(dn);
+    }
+    // wait for block reports to be received.
+    GenericTestUtils.waitFor(() -> {
+      for (Entry<DatanodeStorageInfo, Integer> reportEntry :
+          reportCountsBefore.entrySet()) {
+        final DatanodeStorageInfo dnStorageInfo = reportEntry.getKey();
+        final int cntBefore = reportEntry.getValue();
+        final int currentCnt = dnStorageInfo.getBlockReportCount();
+        if (cntBefore == currentCnt) {
+          // Same count means no report has been received.
+          return false;
+        }
+      }
+      // If we reach here, then all the block reports have been received.
+      return true;
+    }, WAIT_POLL_INTERVAL_MS, WAIT_FOR_FBR_MS);
   }
 
   protected final boolean verifyBlockDeletedFromDir(File dir,
@@ -445,51 +515,58 @@ public abstract class LazyPersistTestCase {
 
       File blockFile = new File(targetDir, lb.getBlock().getBlockName());
       if (blockFile.exists()) {
-        LOG.warn("blockFile: " + blockFile.getAbsolutePath() +
-          " exists after deletion.");
         return false;
       }
       File metaFile = new File(targetDir,
         DatanodeUtil.getMetaName(lb.getBlock().getBlockName(),
           lb.getBlock().getGenerationStamp()));
       if (metaFile.exists()) {
-        LOG.warn("metaFile: " + metaFile.getAbsolutePath() +
-          " exists after deletion.");
         return false;
       }
     }
     return true;
   }
 
-  protected final boolean verifyDeletedBlocks(LocatedBlocks locatedBlocks)
-      throws IOException, InterruptedException {
+  protected final boolean verifyDeletedBlocks(final LocatedBlocks locatedBlocks)
+      throws Exception {
 
     LOG.info("Verifying replica has no saved copy after deletion.");
     triggerBlockReport();
+    final DataNode dn = cluster.getDataNodes().get(0);
 
-    while(
-        cluster.getFsDatasetTestUtils(0).getPendingAsyncDeletions()
-        > 0L){
-      Thread.sleep(1000);
-    }
+    GenericTestUtils.waitFor(() -> {
+      for (DataNode dn1 : cluster.getDataNodes()) {
+        if (cluster.getFsDatasetTestUtils(dn1).getPendingAsyncDeletions()
+            > 0) {
+          return false;
+        }
+      }
+      return true;
+    }, WAIT_POLL_INTERVAL_MS, WAIT_FOR_ASYNC_DELETE_MS);
 
     final String bpid = cluster.getNamesystem().getBlockPoolId();
-    final FsDatasetSpi<?> dataset =
-        cluster.getDataNodes().get(0).getFSDataset();
-
+    final FsDatasetSpi<?> dataset = dn.getFSDataset();
     // Make sure deleted replica does not have a copy on either finalized dir of
-    // transient volume or finalized dir of non-transient volume
+    // transient volume or finalized dir of non-transient volume.
+    // We need to wait until the asyn deletion is scheduled.
     try (FsDatasetSpi.FsVolumeReferences volumes =
         dataset.getFsVolumeReferences()) {
-      for (FsVolumeSpi vol : volumes) {
-        FsVolumeImpl volume = (FsVolumeImpl) vol;
-        File targetDir = (volume.isTransientStorage()) ?
-            volume.getBlockPoolSlice(bpid).getFinalizedDir() :
-            volume.getBlockPoolSlice(bpid).getLazypersistDir();
-        if (verifyBlockDeletedFromDir(targetDir, locatedBlocks) == false) {
+      GenericTestUtils.waitFor(() -> {
+        try {
+          for (FsVolumeSpi vol : volumes) {
+            FsVolumeImpl volume = (FsVolumeImpl) vol;
+            File targetDir = (volume.isTransientStorage()) ?
+                volume.getBlockPoolSlice(bpid).getFinalizedDir() :
+                volume.getBlockPoolSlice(bpid).getLazypersistDir();
+            if (!verifyBlockDeletedFromDir(targetDir, locatedBlocks)) {
+              return false;
+            }
+          }
+          return true;
+        } catch (IOException ie) {
           return false;
         }
-      }
+      }, WAIT_POLL_INTERVAL_MS, WAIT_FOR_ASYNC_DELETE_MS);
     }
     return true;
   }
@@ -530,8 +607,137 @@ public abstract class LazyPersistTestCase {
     DFSTestUtil.waitForMetric(jmx, metricName, expectedValue);
   }
 
-  protected void triggerEviction(DataNode dn) {
+  protected void triggerEviction(final DataNode dn) {
     FsDatasetImpl fsDataset = (FsDatasetImpl) dn.getFSDataset();
     fsDataset.evictLazyPersistBlocks(Long.MAX_VALUE); // Run one eviction cycle.
   }
+
+  /**
+   * Shutdown all datanodes in {@link #cluster}. The call blocks for
+   * {@link #WAIT_FOR_DN_SHUTDOWN_MS} until client report has no datanode
+   * labeled as live.
+   *
+   * @throws TimeoutException if {@link #WAIT_FOR_DN_SHUTDOWN_MS} expires with
+   * at least one datanode still alive.
+   * @throws InterruptedException if the thread receives an interrupt.
+   */
+  protected void shutdownDataNodes()
+      throws TimeoutException, InterruptedException {
+    cluster.shutdownDataNodes();
+    GenericTestUtils.waitFor(() -> {
+      try {
+        DatanodeInfo[] info = client.datanodeReport(
+            HdfsConstants.DatanodeReportType.LIVE);
+        return info.length == 0;
+      } catch (IOException e) {
+        return false;
+      }
+    }, WAIT_POLL_INTERVAL_LARGE_MS, WAIT_FOR_DN_SHUTDOWN_MS);
+  }
+
+  /**
+   * Blocks for {@link #WAIT_FOR_REDUNDANCY_MS}  waiting for corrupt block count
+   * to reach a certain count.
+   *
+   * @param corruptCnt representing the number of corrupt blocks before
+   *                   resuming.
+   * @throws TimeoutException if {@link #WAIT_FOR_REDUNDANCY_MS} expires with
+   *                          corrupt count does not meet the criteria.
+   * @throws InterruptedException if the thread receives an interrupt.
+   */
+  protected void waitForCorruptBlock(final long corruptCnt)
+      throws TimeoutException, InterruptedException {
+    // wait for the redundancy monitor to mark the file as corrupt.
+    GenericTestUtils.waitFor(() -> {
+      Iterator<BlockInfo> bInfoIter = cluster.getNameNode()
+          .getNamesystem().getBlockManager().getCorruptReplicaBlockIterator();
+      int count = 0;
+      while (bInfoIter.hasNext()) {
+        bInfoIter.next();
+        count++;
+      }
+      return corruptCnt == count;
+    }, 2 * WAIT_POLL_INTERVAL_LARGE_MS, WAIT_FOR_REDUNDANCY_MS);
+  }
+
+  /**
+   * Blocks until {@link FSNamesystem#lazyPersistFileScrubber} daemon completes
+   * a full iteration.
+   *
+   * @throws InterruptedException if the thread receives an interrupt.
+   * @throws TimeoutException
+   *                         {@link FSNamesystem#getLazyPersistFileScrubberTS()}
+   *                         does not update the timestamp by
+   *                         {@link #WAIT_FOR_LAZY_SCRUBBER_MS}.
+   */
+  protected void waitForScrubberCycle()
+      throws TimeoutException, InterruptedException {
+    // wait for the redundancy monitor to mark the file as corrupt.
+    final FSNamesystem fsn = cluster.getNamesystem();
+    final long lastTimeStamp = fsn.getLazyPersistFileScrubberTS();
+    if (lastTimeStamp == -1) { // scrubber is disabled
+      return;
+    }
+    GenericTestUtils.waitFor(
+        () -> lastTimeStamp != fsn.getLazyPersistFileScrubberTS(),
+        2 * WAIT_POLL_INTERVAL_LARGE_MS, WAIT_FOR_LAZY_SCRUBBER_MS);
+  }
+
+  /**
+   * Blocks until {@link BlockManager#RedundancyMonitor} daemon completes
+   * a full iteration.
+   *
+   * @throws InterruptedException if the thread receives an interrupt.
+   * @throws TimeoutException {@link BlockManager#getLastRedundancyMonitorTS()}
+   *                          does not update the timestamp by
+   *                          {@link #WAIT_FOR_REDUNDANCY_MS}.
+   */
+  protected void waitForRedundancyMonitorCycle()
+      throws TimeoutException, InterruptedException {
+    // wait for the redundancy monitor to mark the file as corrupt.
+    final BlockManager bm = cluster.getNamesystem().getBlockManager();
+    final long lastRedundancyTS =
+        bm.getLastRedundancyMonitorTS();
+
+    GenericTestUtils.waitFor(
+        () -> lastRedundancyTS != bm.getLastRedundancyMonitorTS(),
+        2 * WAIT_POLL_INTERVAL_LARGE_MS, WAIT_FOR_REDUNDANCY_MS);
+  }
+
+  /**
+   * Blocks until {@link BlockManager#lowRedundancyBlocksCount} reaches a
+   * certain value.
+   *
+   * @throws InterruptedException if the thread receives an interrupt.
+   * @throws TimeoutException {@link BlockManager#getLowRedundancyBlocksCount()}
+   *                          does not update the count by
+   *                          {@link #WAIT_FOR_REDUNDANCY_MS}.
+   */
+  protected void waitForLowRedundancyCount(final long cnt)
+      throws TimeoutException, InterruptedException {
+    final BlockManager bm = cluster.getNamesystem().getBlockManager();
+
+    GenericTestUtils.waitFor(() -> cnt == bm.getLowRedundancyBlocksCount(),
+        2 * WAIT_POLL_INTERVAL_LARGE_MS, WAIT_FOR_REDUNDANCY_MS);
+  }
+
+  /**
+   * Blocks until the file status changes on the filesystem.
+   *
+   * @param path of the file to be checked.
+   * @param expected whether a file should exist or not.
+   * @throws TimeoutException if the file status does not meet the expected by
+   *                          {@link #WAIT_FOR_STORAGE_TYPES_MS}.
+   * @throws InterruptedException if the thread receives an interrupt.
+   */
+  protected void waitForFile(final Path path, final boolean expected)
+      throws TimeoutException, InterruptedException {
+    GenericTestUtils.waitFor(() -> {
+      try {
+        return expected == fs.exists(path);
+      } catch (IOException e) {
+        return false;
+      }
+    }, WAIT_POLL_INTERVAL_MS, WAIT_FOR_STORAGE_TYPES_MS);
+  }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java
index 04f8127..c0b4b17 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java
@@ -16,11 +16,10 @@
  * limitations under the License.
  */
 package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
-import com.google.common.collect.Iterators;
-import com.google.common.util.concurrent.Uninterruptibles;
 import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.util.ThreadUtil;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -33,7 +32,6 @@ import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.apache.hadoop.fs.StorageType.RAM_DISK;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
 
 import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertThat;
@@ -76,7 +74,7 @@ public class TestLazyPersistFiles extends LazyPersistTestCase {
     makeTestFile(path, BLOCK_SIZE, true);
 
     try {
-      client.truncate(path.toString(), BLOCK_SIZE/2);
+      client.truncate(path.toString(), BLOCK_SIZE / 2);
       fail("Truncate to LazyPersist file did not fail as expected");
     } catch (Throwable t) {
       LOG.info("Got expected exception ", t);
@@ -98,28 +96,20 @@ public class TestLazyPersistFiles extends LazyPersistTestCase {
     makeTestFile(path1, BLOCK_SIZE, true);
     ensureFileReplicasOnStorageType(path1, RAM_DISK);
 
-    // Stop the DataNode and sleep for the time it takes the NN to
-    // detect the DN as being dead.
-    cluster.shutdownDataNodes();
-    Thread.sleep(30000L);
+    // Stop the DataNode.
+    shutdownDataNodes();
     assertThat(cluster.getNamesystem().getNumDeadDataNodes(), is(1));
 
     // Next, wait for the redundancy monitor to mark the file as corrupt.
-    Thread.sleep(2 * DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_DEFAULT * 1000);
-
+    waitForRedundancyMonitorCycle();
     // Wait for the LazyPersistFileScrubber to run
-    Thread.sleep(2 * LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC * 1000);
-
+    waitForScrubberCycle();
     // Ensure that path1 does not exist anymore, whereas path2 does.
-    assert(!fs.exists(path1));
+    waitForFile(path1, false);
 
     // We should have zero blocks that needs replication i.e. the one
-    // belonging to path2.
-    assertThat(cluster.getNameNode()
-                      .getNamesystem()
-                      .getBlockManager()
-                      .getLowRedundancyBlocksCount(),
-               is(0L));
+    // belonging to path2. This needs a wait.
+    waitForLowRedundancyCount(0L);
   }
 
   @Test
@@ -134,18 +124,14 @@ public class TestLazyPersistFiles extends LazyPersistTestCase {
 
     // Stop the DataNode and sleep for the time it takes the NN to
     // detect the DN as being dead.
-    cluster.shutdownDataNodes();
-    Thread.sleep(30000L);
-
-    // Next, wait for the redundancy monitor to mark the file as corrupt.
-    Thread.sleep(2 * DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_DEFAULT * 1000);
+    shutdownDataNodes();
 
+    // wait for the redundancy monitor to mark the file as corrupt.
+    waitForCorruptBlock(1L);
     // Wait for the LazyPersistFileScrubber to run
-    Thread.sleep(2 * LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC * 1000);
-
+    waitForScrubberCycle();
     // Ensure that path1 exist.
-    Assert.assertTrue(fs.exists(path1));
-
+    waitForFile(path1, true);
   }
 
  /**
@@ -160,20 +146,14 @@ public class TestLazyPersistFiles extends LazyPersistTestCase {
     makeTestFile(path1, BLOCK_SIZE, true);
     ensureFileReplicasOnStorageType(path1, RAM_DISK);
 
-    cluster.shutdownDataNodes();
+    shutdownDataNodes();
 
     cluster.restartNameNodes();
 
     // wait for the redundancy monitor to mark the file as corrupt.
-    Long corruptBlkCount;
-    do {
-      Thread.sleep(DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_DEFAULT * 1000);
-      corruptBlkCount = (long) Iterators.size(cluster.getNameNode()
-          .getNamesystem().getBlockManager().getCorruptReplicaBlockIterator());
-    } while (corruptBlkCount != 1L);
-
+    waitForCorruptBlock(1L);
     // Ensure path1 exist.
-    Assert.assertTrue(fs.exists(path1));
+    waitForFile(path1, true);
   }
 
   /**
@@ -215,10 +195,8 @@ public class TestLazyPersistFiles extends LazyPersistTestCase {
       threads[i].start();
     }
 
-    Thread.sleep(500);
-
     for (int i = 0; i < NUM_TASKS; i++) {
-      Uninterruptibles.joinUninterruptibly(threads[i]);
+      ThreadUtil.joinUninterruptibly(threads[i]);
     }
     Assert.assertFalse(testFailed.get());
   }
@@ -232,7 +210,7 @@ public class TestLazyPersistFiles extends LazyPersistTestCase {
    */
   @Test
   public void testConcurrentWrites()
-    throws IOException, InterruptedException {
+      throws IOException, InterruptedException, TimeoutException {
     getClusterBuilder().setRamDiskReplicaCapacity(9).build();
     final String METHOD_NAME = GenericTestUtils.getMethodName();
     final int SEED = 0xFADED;
@@ -281,11 +259,11 @@ public class TestLazyPersistFiles extends LazyPersistTestCase {
       this.seed = seed;
       this.latch = latch;
       this.bFail = bFail;
-      System.out.println("Creating Writer: " + id);
+      LOG.info("Creating Writer: {}", id);
     }
 
     public void run() {
-      System.out.println("Writer " + id + " starting... ");
+      LOG.info("Writer {} starting... ", id);
       int i = 0;
       try {
         for (i = 0; i < paths.length; i++) {
@@ -295,9 +273,8 @@ public class TestLazyPersistFiles extends LazyPersistTestCase {
         }
       } catch (IOException e) {
         bFail.set(true);
-        LOG.error("Writer exception: writer id:" + id +
-          " testfile: " + paths[i].toString() +
-          " " + e);
+        LOG.error("Writer exception: writer id:{} testfile: {}",
+            id, paths[i].toString(), e);
       } finally {
         latch.countDown();
       }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistReplicaPlacement.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistReplicaPlacement.java
index c16dbe5..b6413ec 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistReplicaPlacement.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistReplicaPlacement.java
@@ -119,7 +119,7 @@ public class TestLazyPersistReplicaPlacement extends LazyPersistTestCase {
    */
   @Test
   public void testFallbackToDiskPartial()
-      throws IOException, InterruptedException {
+      throws IOException, InterruptedException, TimeoutException {
     getClusterBuilder().setMaxLockedMemory(2 * BLOCK_SIZE).build();
     final String METHOD_NAME = GenericTestUtils.getMethodName();
     Path path = new Path("/" + METHOD_NAME + ".dat");
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyWriter.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyWriter.java
index 1680764..56cc41e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyWriter.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyWriter.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.Assert;
 import org.junit.Test;
@@ -156,7 +157,6 @@ public class TestLazyWriter extends LazyPersistTestCase {
     for (int i = 0; i < NUM_PATHS; ++i) {
       makeTestFile(paths[i + NUM_PATHS], BLOCK_SIZE, true);
       triggerBlockReport();
-      Thread.sleep(3000);
       ensureFileReplicasOnStorageType(paths[i + NUM_PATHS], RAM_DISK);
       ensureFileReplicasOnStorageType(paths[indexes.get(i)], DEFAULT);
       for (int j = i + 1; j < NUM_PATHS; ++j) {
@@ -183,13 +183,13 @@ public class TestLazyWriter extends LazyPersistTestCase {
       throws Exception {
     getClusterBuilder().build();
     final String METHOD_NAME = GenericTestUtils.getMethodName();
-    FsDatasetTestUtil.stopLazyWriter(cluster.getDataNodes().get(0));
+    final DataNode dn = cluster.getDataNodes().get(0);
+    FsDatasetTestUtil.stopLazyWriter(dn);
 
     Path path = new Path("/" + METHOD_NAME + ".dat");
     makeTestFile(path, BLOCK_SIZE, true);
     LocatedBlocks locatedBlocks =
         ensureFileReplicasOnStorageType(path, RAM_DISK);
-
     // Delete before persist
     client.delete(path.toString(), false);
     Assert.assertFalse(fs.exists(path));


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org