You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2014/10/18 01:31:02 UTC

[15/34] git commit: HDFS-6991. Notify NN of evicted block before deleting it from RAM disk. (Arpit Agarwal)

HDFS-6991. Notify NN of evicted block before deleting it from RAM disk. (Arpit Agarwal)

Conflicts:
	hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-6581.txt
	hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/bcec6582
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/bcec6582
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/bcec6582

Branch: refs/heads/branch-2.6
Commit: bcec6582cf9ae1411d31b2a6cf97c2169953f5fe
Parents: 186d195
Author: arp <ar...@apache.org>
Authored: Mon Sep 8 14:29:30 2014 -0700
Committer: Jitendra Pandey <Ji...@Jitendra-Pandeys-MacBook-Pro-4.local>
Committed: Fri Oct 17 16:00:50 2014 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hdfs/DFSConfigKeys.java   |   4 +
 .../server/blockmanagement/BlockManager.java    |   4 +-
 .../hdfs/server/datanode/BPOfferService.java    |   6 -
 .../hadoop/hdfs/server/datanode/DataNode.java   |   2 +-
 .../datanode/fsdataset/impl/FsDatasetImpl.java  | 101 +++++++-----
 .../org/apache/hadoop/hdfs/MiniDFSCluster.java  |   3 +-
 .../fsdataset/impl/TestLazyPersistFiles.java    | 157 ++++++++-----------
 7 files changed, 130 insertions(+), 147 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/bcec6582/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index e12b01a..f6c0180 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -129,6 +129,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final int     DFS_DATANODE_FSDATASETCACHE_MAX_THREADS_PER_VOLUME_DEFAULT = 4;
   public static final String  DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC = "dfs.datanode.lazywriter.interval.sec";
   public static final int     DFS_DATANODE_LAZY_WRITER_INTERVAL_DEFAULT_SEC = 60;
+  public static final String  DFS_DATANODE_RAM_DISK_LOW_WATERMARK_PERCENT = "dfs.datanode.ram.disk.low.watermark.percent";
+  public static final int     DFS_DATANODE_RAM_DISK_LOW_WATERMARK_PERCENT_DEFAULT = 10;
+  public static final String  DFS_DATANODE_RAM_DISK_LOW_WATERMARK_REPLICAS = "dfs.datanode.ram.disk.low.watermark.replicas";
+  public static final int     DFS_DATANODE_RAM_DISK_LOW_WATERMARK_REPLICAS_DEFAULT = 3;
   public static final String  DFS_NAMENODE_PATH_BASED_CACHE_BLOCK_MAP_ALLOCATION_PERCENT =
     "dfs.namenode.path.based.cache.block.map.allocation.percent";
   public static final float    DFS_NAMENODE_PATH_BASED_CACHE_BLOCK_MAP_ALLOCATION_PERCENT_DEFAULT = 0.25f;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bcec6582/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index 81b9bb5..e95f705 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -2119,8 +2119,8 @@ public class BlockManager {
     // Add replica if appropriate. If the replica was previously corrupt
     // but now okay, it might need to be updated.
     if (reportedState == ReplicaState.FINALIZED
-        && (!storedBlock.findDatanode(dn)
-        || corruptReplicas.isReplicaCorrupt(storedBlock, dn))) {
+        && (storedBlock.findStorageInfo(storageInfo) == -1 ||
+            corruptReplicas.isReplicaCorrupt(storedBlock, dn))) {
       toAdd.add(storedBlock);
     }
     return storedBlock;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bcec6582/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
index 2eab87f..1455a8a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
@@ -230,7 +230,6 @@ class BPOfferService {
   void notifyNamenodeReceivedBlock(
       ExtendedBlock block, String delHint, String storageUuid) {
     checkBlock(block);
-    checkDelHint(delHint);
     ReceivedDeletedBlockInfo bInfo = new ReceivedDeletedBlockInfo(
         block.getLocalBlock(),
         ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK,
@@ -249,11 +248,6 @@ class BPOfferService {
         block.getBlockPoolId(), getBlockPoolId());
   }
   
-  private void checkDelHint(String delHint) {
-    Preconditions.checkArgument(delHint != null,
-        "delHint is null");
-  }
-
   void notifyNamenodeDeletedBlock(ExtendedBlock block, String storageUuid) {
     checkBlock(block);
     ReceivedDeletedBlockInfo bInfo = new ReceivedDeletedBlockInfo(

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bcec6582/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index a4db56a..bf09899 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -897,7 +897,7 @@ public class DataNode extends ReconfigurableBase
   }
   
   // calls specific to BP
-  protected void notifyNamenodeReceivedBlock(
+  public void notifyNamenodeReceivedBlock(
       ExtendedBlock block, String delHint, String storageUuid) {
     BPOfferService bpos = blockPoolManager.get(block.getBlockPoolId());
     if(bpos != null) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bcec6582/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index 19260db..2b16a65 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -47,7 +47,6 @@ import javax.management.StandardMBean;
 
 import com.google.common.collect.Lists;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.TreeMultimap;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -282,9 +281,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     cacheManager = new FsDatasetCache(this);
 
     // Start the lazy writer once we have built the replica maps.
-    lazyWriter = new Daemon(new LazyWriter(
-        conf.getInt(DFSConfigKeys.DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC,
-                    DFSConfigKeys.DFS_DATANODE_LAZY_WRITER_INTERVAL_DEFAULT_SEC)));
+    lazyWriter = new Daemon(new LazyWriter(conf));
     lazyWriter.start();
     registerMBean(datanode.getDatanodeUuid());
   }
@@ -2294,16 +2291,23 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     private volatile boolean shouldRun = true;
     final int checkpointerInterval;
     final long estimateBlockSize;
+    final int lowWatermarkFreeSpacePercentage;
+    final int lowWatermarkFreeSpaceReplicas;
 
-    public static final int LOW_WATERMARK_FREE_SPACE_PERCENT = 10;
-    public static final int LOW_WATERMARK_FREE_SPACE_REPLICAS = 3;
 
-
-    public LazyWriter(final int checkpointerInterval) {
-      this.checkpointerInterval = checkpointerInterval;
+    public LazyWriter(Configuration conf) {
+      this.checkpointerInterval = conf.getInt(
+          DFSConfigKeys.DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC,
+          DFSConfigKeys.DFS_DATANODE_LAZY_WRITER_INTERVAL_DEFAULT_SEC);
       this.estimateBlockSize = conf.getLongBytes(
           DFSConfigKeys.DFS_BLOCK_SIZE_KEY,
           DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT);
+      this.lowWatermarkFreeSpacePercentage = conf.getInt(
+          DFSConfigKeys.DFS_DATANODE_RAM_DISK_LOW_WATERMARK_PERCENT,
+          DFSConfigKeys.DFS_DATANODE_RAM_DISK_LOW_WATERMARK_PERCENT_DEFAULT);
+      this.lowWatermarkFreeSpaceReplicas = conf.getInt(
+          DFSConfigKeys.DFS_DATANODE_RAM_DISK_LOW_WATERMARK_REPLICAS,
+          DFSConfigKeys.DFS_DATANODE_RAM_DISK_LOW_WATERMARK_REPLICAS_DEFAULT);
     }
 
     private void moveReplicaToNewVolume(String bpid, long blockId)
@@ -2390,49 +2394,63 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       }
 
       int percentFree = (int) (free * 100 / capacity);
-      return percentFree < LOW_WATERMARK_FREE_SPACE_PERCENT ||
-             free < (estimateBlockSize * LOW_WATERMARK_FREE_SPACE_REPLICAS);
+      return percentFree < lowWatermarkFreeSpacePercentage ||
+             free < (estimateBlockSize * lowWatermarkFreeSpaceReplicas);
     }
 
     /**
      * Attempt to evict one or more transient block replicas we have at least
      * spaceNeeded bytes free.
      */
-    private synchronized void evictBlocks() throws IOException {
+    private void evictBlocks() throws IOException {
       int iterations = 0;
 
-      LazyWriteReplicaTracker.ReplicaState replicaState =
-          lazyWriteReplicaTracker.getNextCandidateForEviction();
-
-      while (replicaState != null &&
-             iterations++ < MAX_BLOCK_EVICTIONS_PER_ITERATION &
+      while (iterations++ < MAX_BLOCK_EVICTIONS_PER_ITERATION &&
              transientFreeSpaceBelowThreshold()) {
+        LazyWriteReplicaTracker.ReplicaState replicaState =
+            lazyWriteReplicaTracker.getNextCandidateForEviction();
+
         if (LOG.isDebugEnabled()) {
-          LOG.info("Evicting block " + replicaState);
+          LOG.debug("Evicting block " + replicaState);
         }
-        ReplicaInfo replicaInfo = getReplicaInfo(replicaState.bpid, replicaState.blockId);
-        Preconditions.checkState(replicaInfo.getVolume().isTransientStorage());
-        File blockFile = replicaInfo.getBlockFile();
-        File metaFile = replicaInfo.getMetaFile();
-        long blockFileUsed = blockFile.length();
-        long metaFileUsed = metaFile.length();
-        lazyWriteReplicaTracker.discardReplica(replicaState, false);
-
-        // Move the replica from lazyPersist/ to finalized/ on target volume
-        BlockPoolSlice bpSlice =
-            replicaState.lazyPersistVolume.getBlockPoolSlice(replicaState.bpid);
-        File newBlockFile = bpSlice.activateSavedReplica(
-            replicaInfo, replicaState.savedBlockFile);
-
-        ReplicaInfo newReplicaInfo =
-            new FinalizedReplica(replicaInfo.getBlockId(),
-                                 replicaInfo.getBytesOnDisk(),
-                                 replicaInfo.getGenerationStamp(),
-                                 replicaState.lazyPersistVolume,
-                                 newBlockFile.getParentFile());
-
-        // Update the volumeMap entry. This removes the old entry.
-        volumeMap.add(replicaState.bpid, newReplicaInfo);
+
+        ReplicaInfo replicaInfo, newReplicaInfo;
+        File blockFile, metaFile;
+        long blockFileUsed, metaFileUsed;
+
+        synchronized (FsDatasetImpl.this) {
+          replicaInfo = getReplicaInfo(replicaState.bpid, replicaState.blockId);
+          Preconditions.checkState(replicaInfo.getVolume().isTransientStorage());
+          blockFile = replicaInfo.getBlockFile();
+          metaFile = replicaInfo.getMetaFile();
+          blockFileUsed = blockFile.length();
+          metaFileUsed = metaFile.length();
+          lazyWriteReplicaTracker.discardReplica(replicaState, false);
+
+          // Move the replica from lazyPersist/ to finalized/ on target volume
+          BlockPoolSlice bpSlice =
+              replicaState.lazyPersistVolume.getBlockPoolSlice(replicaState.bpid);
+          File newBlockFile = bpSlice.activateSavedReplica(
+              replicaInfo, replicaState.savedBlockFile);
+
+          newReplicaInfo =
+              new FinalizedReplica(replicaInfo.getBlockId(),
+                                   replicaInfo.getBytesOnDisk(),
+                                   replicaInfo.getGenerationStamp(),
+                                   replicaState.lazyPersistVolume,
+                                   newBlockFile.getParentFile());
+
+          // Update the volumeMap entry.
+          volumeMap.add(replicaState.bpid, newReplicaInfo);
+        }
+
+        // Before deleting the files from transient storage we must notify the
+        // NN that the files are on the new storage. Else a blockReport from
+        // the transient storage might cause the NN to think the blocks are lost.
+        ExtendedBlock extendedBlock =
+            new ExtendedBlock(replicaState.bpid, newReplicaInfo);
+        datanode.notifyNamenodeReceivedBlock(
+            extendedBlock, null, newReplicaInfo.getStorageUuid());
 
         // Remove the old replicas from transient storage.
         if (blockFile.delete() || !blockFile.exists()) {
@@ -2444,7 +2462,6 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
 
         // If deletion failed then the directory scanner will cleanup the blocks
         // eventually.
-        replicaState = lazyWriteReplicaTracker.getNextCandidateForEviction();
       }
     }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bcec6582/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
index 338744d..7034715 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
@@ -1343,6 +1343,7 @@ public class MiniDFSCluster {
     }
 
     int curDatanodesNum = dataNodes.size();
+    final int curDatanodesNumSaved = curDatanodesNum;
     // for mincluster's the default initialDelay for BRs is 0
     if (conf.get(DFS_BLOCKREPORT_INITIAL_DELAY_KEY) == null) {
       conf.setLong(DFS_BLOCKREPORT_INITIAL_DELAY_KEY, 0);
@@ -1481,7 +1482,7 @@ public class MiniDFSCluster {
     waitActive();
     
     if (storageCapacities != null) {
-      for (int i = curDatanodesNum; i < curDatanodesNum+numDataNodes; ++i) {
+      for (int i = curDatanodesNumSaved; i < curDatanodesNumSaved+numDataNodes; ++i) {
         final int index = i - curDatanodesNum;
         List<? extends FsVolumeSpi> volumes = dns[index].getFSDataset().getVolumes();
         assert storageCapacities[index].length == storagesPerDatanode;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bcec6582/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java
index 7dfba50..fcc4798 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java
@@ -23,7 +23,6 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CreateFlag;
-import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
@@ -71,13 +70,14 @@ public class TestLazyPersistFiles {
 
   private static final int THREADPOOL_SIZE = 10;
 
-  private static short REPL_FACTOR = 1;
+  private static final short REPL_FACTOR = 1;
   private static final int BLOCK_SIZE = 10485760;   // 10 MB
   private static final int LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC = 3;
   private static final long HEARTBEAT_INTERVAL_SEC = 1;
   private static final int HEARTBEAT_RECHECK_INTERVAL_MSEC = 500;
   private static final int LAZY_WRITER_INTERVAL_SEC = 1;
   private static final int BUFFER_LENGTH = 4096;
+  private static final int EVICTION_LOW_WATERMARK = 1;
 
   private MiniDFSCluster cluster;
   private DistributedFileSystem fs;
@@ -101,7 +101,7 @@ public class TestLazyPersistFiles {
 
   @Test (timeout=300000)
   public void testFlagNotSetByDefault() throws IOException {
-    startUpCluster(REPL_FACTOR, null, -1);
+    startUpCluster(false, -1);
     final String METHOD_NAME = GenericTestUtils.getMethodName();
     Path path = new Path("/" + METHOD_NAME + ".dat");
 
@@ -113,7 +113,7 @@ public class TestLazyPersistFiles {
 
   @Test (timeout=300000)
   public void testFlagPropagation() throws IOException {
-    startUpCluster(REPL_FACTOR, null, -1);
+    startUpCluster(false, -1);
     final String METHOD_NAME = GenericTestUtils.getMethodName();
     Path path = new Path("/" + METHOD_NAME + ".dat");
 
@@ -125,7 +125,7 @@ public class TestLazyPersistFiles {
 
   @Test (timeout=300000)
   public void testFlagPersistenceInEditLog() throws IOException {
-    startUpCluster(REPL_FACTOR, null, -1);
+    startUpCluster(false, -1);
     final String METHOD_NAME = GenericTestUtils.getMethodName();
     Path path = new Path("/" + METHOD_NAME + ".dat");
 
@@ -139,10 +139,9 @@ public class TestLazyPersistFiles {
 
   @Test (timeout=300000)
   public void testFlagPersistenceInFsImage() throws IOException {
-    startUpCluster(REPL_FACTOR, null, -1);
+    startUpCluster(false, -1);
     final String METHOD_NAME = GenericTestUtils.getMethodName();
     Path path = new Path("/" + METHOD_NAME + ".dat");
-    FSDataOutputStream fos = null;
 
     makeTestFile(path, 0, true);
     // checkpoint
@@ -158,7 +157,7 @@ public class TestLazyPersistFiles {
 
   @Test (timeout=300000)
   public void testPlacementOnRamDisk() throws IOException {
-    startUpCluster(REPL_FACTOR, new StorageType[] { DEFAULT, RAM_DISK}, -1);
+    startUpCluster(true, -1);
     final String METHOD_NAME = GenericTestUtils.getMethodName();
     Path path = new Path("/" + METHOD_NAME + ".dat");
 
@@ -168,8 +167,7 @@ public class TestLazyPersistFiles {
 
   @Test (timeout=300000)
   public void testPlacementOnSizeLimitedRamDisk() throws IOException {
-    startUpCluster(REPL_FACTOR, new StorageType[] { DEFAULT, RAM_DISK },
-      3 * BLOCK_SIZE -1); // 2 replicas + delta
+    startUpCluster(true, 3);
     final String METHOD_NAME = GenericTestUtils.getMethodName();
     Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
     Path path2 = new Path("/" + METHOD_NAME + ".02.dat");
@@ -188,7 +186,7 @@ public class TestLazyPersistFiles {
    */
   @Test (timeout=300000)
   public void testFallbackToDisk() throws IOException {
-    startUpCluster(REPL_FACTOR, null, -1);
+    startUpCluster(false, -1);
     final String METHOD_NAME = GenericTestUtils.getMethodName();
     Path path = new Path("/" + METHOD_NAME + ".dat");
 
@@ -202,7 +200,7 @@ public class TestLazyPersistFiles {
    */
   @Test (timeout=300000)
   public void testFallbackToDiskFull() throws IOException {
-    startUpCluster(REPL_FACTOR, null, BLOCK_SIZE - 1);
+    startUpCluster(false, 0);
     final String METHOD_NAME = GenericTestUtils.getMethodName();
     Path path = new Path("/" + METHOD_NAME + ".dat");
 
@@ -213,15 +211,13 @@ public class TestLazyPersistFiles {
   /**
    * File partially fit in RamDisk after eviction.
    * RamDisk can fit 2 blocks. Write a file with 5 blocks.
-   * Expect 2 blocks are on RamDisk whereas other 3 on disk.
+   * Expect 2 or less blocks are on RamDisk and 3 or more on disk.
    * @throws IOException
    */
   @Test (timeout=300000)
   public void testFallbackToDiskPartial()
     throws IOException, InterruptedException {
-    startUpCluster(REPL_FACTOR,
-        new StorageType[] { RAM_DISK, DEFAULT },
-        BLOCK_SIZE * 3 - 1);
+    startUpCluster(true, 2);
     final String METHOD_NAME = GenericTestUtils.getMethodName();
     Path path = new Path("/" + METHOD_NAME + ".dat");
 
@@ -241,12 +237,15 @@ public class TestLazyPersistFiles {
     for (LocatedBlock locatedBlock : locatedBlocks.getLocatedBlocks()) {
       if (locatedBlock.getStorageTypes()[0] == RAM_DISK) {
         numBlocksOnRamDisk++;
-      }else if (locatedBlock.getStorageTypes()[0] == DEFAULT) {
+      } else if (locatedBlock.getStorageTypes()[0] == DEFAULT) {
         numBlocksOnDisk++;
       }
     }
-    assertThat(numBlocksOnRamDisk, is(2));
-    assertThat(numBlocksOnDisk, is(3));
+
+    // Since eviction is asynchronous, depending on the timing of eviction
+    // wrt writes, we may get 2 or less blocks on RAM disk.
+    assert(numBlocksOnRamDisk <= 2);
+    assert(numBlocksOnDisk >= 3);
   }
 
   /**
@@ -257,7 +256,7 @@ public class TestLazyPersistFiles {
    */
   @Test (timeout=300000)
   public void testRamDiskNotChosenByDefault() throws IOException {
-    startUpCluster(REPL_FACTOR, new StorageType[] {RAM_DISK, RAM_DISK}, -1);
+    startUpCluster(true, -1);
     final String METHOD_NAME = GenericTestUtils.getMethodName();
     Path path = new Path("/" + METHOD_NAME + ".dat");
 
@@ -275,7 +274,7 @@ public class TestLazyPersistFiles {
    */
   @Test (timeout=300000)
   public void testAppendIsDenied() throws IOException {
-    startUpCluster(REPL_FACTOR, new StorageType[] {RAM_DISK, DEFAULT }, -1);
+    startUpCluster(true, -1);
     final String METHOD_NAME = GenericTestUtils.getMethodName();
     Path path = new Path("/" + METHOD_NAME + ".dat");
 
@@ -297,17 +296,12 @@ public class TestLazyPersistFiles {
   @Test (timeout=300000)
   public void testLazyPersistFilesAreDiscarded()
       throws IOException, InterruptedException {
-    startUpCluster(REPL_FACTOR,
-                   new StorageType[] { RAM_DISK, DEFAULT },
-                   (2 * BLOCK_SIZE - 1));   // 1 replica + delta.
+    startUpCluster(true, 2);
     final String METHOD_NAME = GenericTestUtils.getMethodName();
     Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
-    Path path2 = new Path("/" + METHOD_NAME + ".02.dat");
 
     makeTestFile(path1, BLOCK_SIZE, true);
-    makeTestFile(path2, BLOCK_SIZE, false);
     ensureFileReplicasOnStorageType(path1, RAM_DISK);
-    ensureFileReplicasOnStorageType(path2, DEFAULT);
 
     // Stop the DataNode and sleep for the time it takes the NN to
     // detect the DN as being dead.
@@ -315,30 +309,28 @@ public class TestLazyPersistFiles {
     Thread.sleep(30000L);
     assertThat(cluster.getNamesystem().getNumDeadDataNodes(), is(1));
 
-    // Next, wait for the replication monitor to mark the file as
-    // corrupt, plus some delta.
+    // Next, wait for the replication monitor to mark the file as corrupt
     Thread.sleep(2 * DFS_NAMENODE_REPLICATION_INTERVAL_DEFAULT * 1000);
 
-    // Wait for the LazyPersistFileScrubber to run, plus some delta.
+    // Wait for the LazyPersistFileScrubber to run
     Thread.sleep(2 * LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC * 1000);
 
     // Ensure that path1 does not exist anymore, whereas path2 does.
     assert(!fs.exists(path1));
-    assert(fs.exists(path2));
 
-    // We should have only one block that needs replication i.e. the one
+    // We should have zero blocks that needs replication i.e. the one
     // belonging to path2.
     assertThat(cluster.getNameNode()
                       .getNamesystem()
                       .getBlockManager()
                       .getUnderReplicatedBlocksCount(),
-               is(1L));
+               is(0L));
   }
 
   @Test (timeout=300000)
   public void testLazyPersistBlocksAreSaved()
       throws IOException, InterruptedException {
-    startUpCluster(REPL_FACTOR, new StorageType[] { RAM_DISK, DEFAULT }, -1);
+    startUpCluster(true, -1);
     final String METHOD_NAME = GenericTestUtils.getMethodName();
     Path path = new Path("/" + METHOD_NAME + ".dat");
 
@@ -386,16 +378,12 @@ public class TestLazyPersistFiles {
 
   /**
    * RamDisk eviction after lazy persist to disk.
-   * Evicted blocks are still readable with on-disk replicas.
    * @throws IOException
    * @throws InterruptedException
    */
- @Test (timeout=300000)
-  public void testRamDiskEviction()
-      throws IOException, InterruptedException {
-    startUpCluster(REPL_FACTOR,
-        new StorageType[] { RAM_DISK, DEFAULT },
-        (2 * BLOCK_SIZE - 1));     // 1 replica + delta.
+  @Test (timeout=300000)
+  public void testRamDiskEviction() throws IOException, InterruptedException {
+    startUpCluster(true, 1 + EVICTION_LOW_WATERMARK);
     final String METHOD_NAME = GenericTestUtils.getMethodName();
     Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
     Path path2 = new Path("/" + METHOD_NAME + ".02.dat");
@@ -405,16 +393,16 @@ public class TestLazyPersistFiles {
     ensureFileReplicasOnStorageType(path1, RAM_DISK);
 
     // Sleep for a short time to allow the lazy writer thread to do its job.
-    // However the block replica should not be evicted from RAM_DISK yet.
     Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);
     ensureFileReplicasOnStorageType(path1, RAM_DISK);
 
     // Create another file with a replica on RAM_DISK.
     makeTestFile(path2, BLOCK_SIZE, true);
+    Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);
     triggerBlockReport();
 
-    // Make sure that the second file's block replica is on RAM_DISK, whereas
-    // the original file's block replica is now on disk.
+    // Ensure the first file was evicted to disk, the second is still on
+    // RAM_DISK.
     ensureFileReplicasOnStorageType(path2, RAM_DISK);
     ensureFileReplicasOnStorageType(path1, DEFAULT);
   }
@@ -428,9 +416,7 @@ public class TestLazyPersistFiles {
   @Test (timeout=300000)
   public void testRamDiskEvictionBeforePersist()
     throws IOException, InterruptedException {
-    // 1 replica + delta, lazy persist interval every 50 minutes
-    startUpCluster(REPL_FACTOR, new StorageType[] { RAM_DISK, DEFAULT },
-      (2 * BLOCK_SIZE - 1));
+    startUpCluster(true, 1);
     final String METHOD_NAME = GenericTestUtils.getMethodName();
     Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
     Path path2 = new Path("/" + METHOD_NAME + ".02.dat");
@@ -463,8 +449,7 @@ public class TestLazyPersistFiles {
   @Test (timeout=300000)
   public void testRamDiskEvictionLRU()
     throws IOException, InterruptedException {
-    startUpCluster(REPL_FACTOR, new StorageType[] { RAM_DISK, DEFAULT },
-      (4 * BLOCK_SIZE -1));  // 3 replica + delta.
+    startUpCluster(true, 3);
     final String METHOD_NAME = GenericTestUtils.getMethodName();
     final int NUM_PATHS = 6;
     Path paths[] = new Path[NUM_PATHS];
@@ -501,8 +486,7 @@ public class TestLazyPersistFiles {
   @Test (timeout=300000)
   public void testDeleteBeforePersist()
     throws IOException, InterruptedException {
-    startUpCluster(REPL_FACTOR, new StorageType[] { RAM_DISK, DEFAULT },
-      -1);
+    startUpCluster(true, -1);
     final String METHOD_NAME = GenericTestUtils.getMethodName();
     stopLazyWriter(cluster.getDataNodes().get(0));
 
@@ -527,7 +511,7 @@ public class TestLazyPersistFiles {
   @Test (timeout=300000)
   public void testDeleteAfterPersist()
     throws IOException, InterruptedException {
-    startUpCluster(REPL_FACTOR, new StorageType[] { RAM_DISK, DEFAULT }, -1);
+    startUpCluster(true, -1);
     final String METHOD_NAME = GenericTestUtils.getMethodName();
     Path path = new Path("/" + METHOD_NAME + ".dat");
 
@@ -554,8 +538,7 @@ public class TestLazyPersistFiles {
   @Test (timeout=300000)
   public void testDfsUsageCreateDelete()
     throws IOException, InterruptedException {
-    startUpCluster(REPL_FACTOR, new StorageType[] { RAM_DISK, DEFAULT },
-      5 * BLOCK_SIZE - 1);  // 4 replica + delta
+    startUpCluster(true, 4);
     final String METHOD_NAME = GenericTestUtils.getMethodName();
     Path path = new Path("/" + METHOD_NAME + ".dat");
 
@@ -586,8 +569,7 @@ public class TestLazyPersistFiles {
   @Test (timeout=300000)
   public void testConcurrentRead()
     throws Exception {
-    startUpCluster(REPL_FACTOR, new StorageType[] { DEFAULT, RAM_DISK },
-      3 * BLOCK_SIZE -1); // 2 replicas + delta
+    startUpCluster(true, 2);
     final String METHOD_NAME = GenericTestUtils.getMethodName();
     final Path path1 = new Path("/" + METHOD_NAME + ".dat");
 
@@ -638,8 +620,7 @@ public class TestLazyPersistFiles {
   @Test (timeout=300000)
   public void testConcurrentWrites()
     throws IOException, InterruptedException {
-    startUpCluster(REPL_FACTOR, new StorageType[] { RAM_DISK, DEFAULT },
-      (10 * BLOCK_SIZE -1)); // 9 replica + delta.
+    startUpCluster(true, 9);
     final String METHOD_NAME = GenericTestUtils.getMethodName();
     final int SEED = 0xFADED;
     final int NUM_WRITERS = 4;
@@ -659,8 +640,7 @@ public class TestLazyPersistFiles {
 
     ExecutorService executor = Executors.newFixedThreadPool(THREADPOOL_SIZE);
     for (int i = 0; i < NUM_WRITERS; i++) {
-      Runnable writer = new WriterRunnable(cluster, i, paths[i], SEED, latch,
-                                           testFailed);
+      Runnable writer = new WriterRunnable(i, paths[i], SEED, latch, testFailed);
       executor.execute(writer);
     }
 
@@ -677,9 +657,7 @@ public class TestLazyPersistFiles {
   public void testDnRestartWithSavedReplicas()
       throws IOException, InterruptedException {
 
-    startUpCluster(REPL_FACTOR,
-        new StorageType[] {RAM_DISK, DEFAULT },
-        (2 * BLOCK_SIZE - 1));     // 1 replica + delta.
+    startUpCluster(true, -1);
     final String METHOD_NAME = GenericTestUtils.getMethodName();
     Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
 
@@ -703,9 +681,7 @@ public class TestLazyPersistFiles {
   public void testDnRestartWithUnsavedReplicas()
       throws IOException, InterruptedException {
 
-    startUpCluster(REPL_FACTOR,
-                   new StorageType[] {RAM_DISK, DEFAULT },
-                   (2 * BLOCK_SIZE - 1));     // 1 replica + delta.
+    startUpCluster(true, 1);
     stopLazyWriter(cluster.getDataNodes().get(0));
 
     final String METHOD_NAME = GenericTestUtils.getMethodName();
@@ -727,9 +703,8 @@ public class TestLazyPersistFiles {
    * If ramDiskStorageLimit is >=0, then RAM_DISK capacity is artificially
    * capped. If ramDiskStorageLimit < 0 then it is ignored.
    */
-  private void startUpCluster(final int numDataNodes,
-                              final StorageType[] storageTypes,
-                              final long ramDiskStorageLimit,
+  private void startUpCluster(boolean hasTransientStorage,
+                              final int ramDiskReplicaCapacity,
                               final boolean useSCR)
       throws IOException {
 
@@ -739,42 +714,36 @@ public class TestLazyPersistFiles {
                 LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC);
     conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, HEARTBEAT_INTERVAL_SEC);
     conf.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
-      HEARTBEAT_RECHECK_INTERVAL_MSEC);
+                HEARTBEAT_RECHECK_INTERVAL_MSEC);
     conf.setInt(DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC,
-      LAZY_WRITER_INTERVAL_SEC);
+                LAZY_WRITER_INTERVAL_SEC);
+    conf.setInt(DFS_DATANODE_RAM_DISK_LOW_WATERMARK_REPLICAS,
+                EVICTION_LOW_WATERMARK);
 
-    conf.setBoolean(DFS_CLIENT_READ_SHORTCIRCUIT_KEY,useSCR);
+    conf.setBoolean(DFS_CLIENT_READ_SHORTCIRCUIT_KEY, useSCR);
 
-    REPL_FACTOR = 1; //Reset in case a test has modified the value
+    long[] capacities = null;
+    if (hasTransientStorage && ramDiskReplicaCapacity >= 0) {
+      // Convert replica count to byte count, add some delta for .meta and VERSION files.
+      long ramDiskStorageLimit = ((long) ramDiskReplicaCapacity * BLOCK_SIZE) + (BLOCK_SIZE - 1);
+      capacities = new long[] { ramDiskStorageLimit, -1 };
+    }
 
     cluster = new MiniDFSCluster
         .Builder(conf)
-        .numDataNodes(numDataNodes)
-        .storageTypes(storageTypes != null ? storageTypes : new StorageType[] { DEFAULT, DEFAULT })
+        .numDataNodes(REPL_FACTOR)
+        .storageCapacities(capacities)
+        .storageTypes(hasTransientStorage ? new StorageType[]{ RAM_DISK, DEFAULT } : null)
         .build();
     fs = cluster.getFileSystem();
     client = fs.getClient();
-
-    // Artificially cap the storage capacity of the RAM_DISK volume.
-    if (ramDiskStorageLimit >= 0) {
-      List<? extends FsVolumeSpi> volumes =
-          cluster.getDataNodes().get(0).getFSDataset().getVolumes();
-
-      for (FsVolumeSpi volume : volumes) {
-        if (volume.getStorageType() == RAM_DISK) {
-          ((FsTransientVolumeImpl) volume).setCapacityForTesting(ramDiskStorageLimit);
-        }
-      }
-    }
-
     LOG.info("Cluster startup complete");
   }
 
-  private void startUpCluster(final int numDataNodes,
-                              final StorageType[] storageTypes,
-                              final long ramDiskStorageLimit)
+  private void startUpCluster(boolean hasTransientStorage,
+                              final int ramDiskReplicaCapacity)
     throws IOException {
-    startUpCluster(numDataNodes, storageTypes, ramDiskStorageLimit, false);
+    startUpCluster(hasTransientStorage, ramDiskReplicaCapacity, false);
   }
 
   private void makeTestFile(Path path, long length, final boolean isLazyPersist)
@@ -908,17 +877,15 @@ public class TestLazyPersistFiles {
 
   class WriterRunnable implements Runnable {
     private final int id;
-    private final MiniDFSCluster cluster;
     private final Path paths[];
     private final int seed;
     private CountDownLatch latch;
     private AtomicBoolean bFail;
 
-    public WriterRunnable(MiniDFSCluster cluster, int threadIndex, Path[] paths,
+    public WriterRunnable(int threadIndex, Path[] paths,
                           int seed, CountDownLatch latch,
                           AtomicBoolean bFail) {
       id = threadIndex;
-      this.cluster = cluster;
       this.paths = paths;
       this.seed = seed;
       this.latch = latch;