You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ar...@apache.org on 2014/08/29 08:15:35 UTC

[1/2] git commit: HDFS-6960. Bugfix in LazyWriter, fix test case and some refactoring. (Arpit Agarwal)

Repository: hadoop
Updated Branches:
  refs/heads/HDFS-6581 7e32be876 -> c92837aea


HDFS-6960. Bugfix in LazyWriter, fix test case and some refactoring. (Arpit Agarwal)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/4cf9afac
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/4cf9afac
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/4cf9afac

Branch: refs/heads/HDFS-6581
Commit: 4cf9afacbe3d0814fb616d238aa9b16b1ae68386
Parents: 7e32be8
Author: arp <ar...@apache.org>
Authored: Thu Aug 28 23:05:32 2014 -0700
Committer: arp <ar...@apache.org>
Committed: Thu Aug 28 23:05:32 2014 -0700

----------------------------------------------------------------------
 .../hadoop-hdfs/CHANGES-HDFS-6581.txt           |   3 +
 .../datanode/fsdataset/impl/BlockPoolSlice.java |  11 +-
 .../datanode/fsdataset/impl/FsDatasetImpl.java  | 129 +++++++++----------
 .../datanode/fsdataset/impl/FsVolumeImpl.java   |  16 ---
 .../fsdataset/impl/LazyWriteReplicaTracker.java |  76 +++++++++--
 .../fsdataset/impl/TestLazyPersistFiles.java    |  10 +-
 6 files changed, 141 insertions(+), 104 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/4cf9afac/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-6581.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-6581.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-6581.txt
index b0fb070..881cb63 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-6581.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-6581.txt
@@ -24,3 +24,6 @@
     HDFS-6928. 'hdfs put' command should accept lazyPersist flag for testing.
     (Arpit Agarwal)
 
+    HDFS-6960. Bugfix in LazyWriter, fix test case and some refactoring.
+    (Arpit Agarwal)
+

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4cf9afac/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
index 31a254b..1313fef 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
@@ -271,10 +271,13 @@ class BlockPoolSlice {
     return blockFile;
   }
 
-  File lazyPersistReplica(Block b, File f) throws IOException {
-    File blockFile = FsDatasetImpl.copyBlockFiles(b, f, lazypersistDir);
-    File metaFile = FsDatasetUtil.getMetaFile(blockFile, b.getGenerationStamp());
-    dfsUsage.incDfsUsed(b.getNumBytes() + metaFile.length());
+  File lazyPersistReplica(ReplicaInfo replicaInfo) throws IOException {
+    if (!lazypersistDir.exists() && !lazypersistDir.mkdirs()) {
+      FsDatasetImpl.LOG.warn("Failed to create " + lazypersistDir);
+    }
+    File metaFile = FsDatasetImpl.copyBlockFiles(replicaInfo, lazypersistDir);
+    File blockFile = Block.metaToBlockFile(metaFile);
+    dfsUsage.incDfsUsed(replicaInfo.getNumBytes() + metaFile.length());
     return blockFile;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4cf9afac/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index 8643d6b..22f626c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -58,6 +58,7 @@ import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
 import org.apache.hadoop.hdfs.server.datanode.DataBlockScanner;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.DataStorage;
+import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
 import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
 import org.apache.hadoop.hdfs.server.datanode.Replica;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaAlreadyExistsException;
@@ -565,28 +566,33 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     return dstfile;
   }
 
-  static File copyBlockFiles(Block b, File srcfile, File destdir)
+  /**
+   * Copy the block and meta files for the given block from the given
+   * @return the new meta file.
+   * @throws IOException
+   */
+  static File copyBlockFiles(ReplicaInfo replicaInfo, File destRoot)
       throws IOException {
-    final File dstfile = new File(destdir, b.getBlockName());
-    final File srcmeta = FsDatasetUtil.getMetaFile(srcfile, b.getGenerationStamp());
-    final File dstmeta = FsDatasetUtil.getMetaFile(dstfile, b.getGenerationStamp());
+    final File destDir = DatanodeUtil.idToBlockDir(destRoot, replicaInfo.getBlockId());
+    final File dstFile = new File(destDir, replicaInfo.getBlockName());
+    final File dstMeta = FsDatasetUtil.getMetaFile(dstFile, replicaInfo.getGenerationStamp());
+    final File srcMeta = replicaInfo.getMetaFile();
+    final File srcFile = replicaInfo.getBlockFile();
     try {
-      FileUtils.copyFile(srcmeta, dstmeta);
+      FileUtils.copyFile(srcMeta, dstMeta);
     } catch (IOException e) {
-      throw new IOException("Failed to copy meta file for " + b
-          + " from " + srcmeta + " to " + dstmeta, e);
+      throw new IOException("Failed to copy " + srcMeta + " to " + dstMeta, e);
     }
     try {
-      FileUtils.copyFile(srcfile, dstfile);
+      FileUtils.copyFile(srcFile, dstFile);
     } catch (IOException e) {
-      throw new IOException("Failed to copy block file for " + b
-          + " from " + srcfile + " to " + dstfile.getAbsolutePath(), e);
+      throw new IOException("Failed to copy " + srcFile + " to " + dstFile, e);
     }
     if (LOG.isDebugEnabled()) {
-      LOG.debug("addBlock: Moved " + srcmeta + " to " + dstmeta
-          + " and " + srcfile + " to " + dstfile);
+      LOG.debug("addBlock: Moved " + srcMeta + " to " + dstMeta);
+      LOG.debug("addBlock: Moved " + srcFile + " to " + dstFile);
     }
-    return dstfile;
+    return dstMeta;
   }
 
   static private void truncateBlock(File blockFile, File metaFile,
@@ -1174,10 +1180,6 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
 
       if (v.isTransientStorage()) {
         lazyWriteReplicaTracker.addReplica(bpid, replicaInfo.getBlockId(), v);
-
-        // Schedule a checkpoint.
-        ((LazyWriter) lazyWriter.getRunnable())
-            .addReplicaToLazyWriteQueue(bpid, replicaInfo.getBlockId());
       }
     }
     volumeMap.add(bpid, newReplicaInfo);
@@ -2188,32 +2190,12 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
         nbytes, flags);
   }
 
-  private static class BlockIdPair {
-    final String bpid;
-    final long blockId;
-
-    BlockIdPair(final String bpid, final long blockId) {
-      this.bpid = bpid;
-      this.blockId = blockId;
-    }
-  }
-
-  private class LazyWriter implements Runnable {
+  class LazyWriter implements Runnable {
     private volatile boolean shouldRun = true;
     final int checkpointerInterval;
 
-    final private Queue<BlockIdPair> blocksPendingCheckpoint;
-
     public LazyWriter(final int checkpointerInterval) {
       this.checkpointerInterval = checkpointerInterval;
-      blocksPendingCheckpoint = new LinkedList<BlockIdPair>();
-    }
-
-    // Schedule a replica for writing to persistent storage.
-    public synchronized void addReplicaToLazyWriteQueue(
-        String bpid, long blockId) {
-      LOG.info("Block with blockId=" + blockId + "; bpid=" + bpid + " added to lazy writer queue");
-      blocksPendingCheckpoint.add(new BlockIdPair(bpid, blockId));
     }
 
     private void moveReplicaToNewVolume(String bpid, long blockId)
@@ -2221,76 +2203,85 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
 
       LOG.info("LazyWriter invoked to save blockId=" + blockId + "; bpid=" + bpid);
 
-      FsVolumeImpl targetVolume = null;
-      Block block = null;
-      File blockFile = null;
+      FsVolumeImpl targetVolume;
+      ReplicaInfo replicaInfo;
 
       synchronized (this) {
-        block = getStoredBlock(bpid, blockId);
-        blockFile = getFile(bpid, blockId);
+        replicaInfo = volumeMap.get(bpid, blockId);
 
-        if (block == null) {
-          // The block was deleted before it could be checkpointed.
+        if (replicaInfo == null || !replicaInfo.getVolume().isTransientStorage()) {
+          // The block was either deleted before it could be checkpointed or
+          // it is already on persistent storage. This can occur if a second
+          // replica on persistent storage was found after the lazy write was
+          // scheduled.
           return;
         }
 
         // Pick a target volume for the block.
         targetVolume = volumes.getNextVolume(
-            StorageType.DEFAULT, block.getNumBytes());
+            StorageType.DEFAULT, replicaInfo.getNumBytes());
+      }
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("LazyWriter starting to save blockId=" + blockId + "; bpid=" + bpid);
       }
 
-      LOG.info("LazyWriter starting to save blockId=" + blockId + "; bpid=" + bpid);
       lazyWriteReplicaTracker.recordStartLazyPersist(bpid, blockId, targetVolume);
       File savedBlockFile = targetVolume.getBlockPoolSlice(bpid)
-                                        .lazyPersistReplica(block, blockFile);
+                                        .lazyPersistReplica(replicaInfo);
       lazyWriteReplicaTracker.recordEndLazyPersist(bpid, blockId, savedBlockFile);
-      LOG.info("LazyWriter finished saving blockId=" + blockId + "; bpid=" + bpid +
-          " to file " + savedBlockFile);
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("LazyWriter finished saving blockId=" + blockId + "; bpid=" + bpid +
+                 " to file " + savedBlockFile);
+      }
     }
 
     /**
      * Checkpoint a pending replica to persistent storage now.
+     * If we fail then move the replica to the end of the queue.
      * @return true if there is more work to be done, false otherwise.
      */
     private boolean saveNextReplica() {
-      BlockIdPair blockIdPair = null;
-      int moreWorkThreshold = 0;
+      LazyWriteReplicaTracker.ReplicaState replicaState = null;
+      boolean succeeded = false;
 
       try {
         synchronized (this) {
-          // Dequeue the next replica waiting to be checkpointed.
-          blockIdPair = blocksPendingCheckpoint.poll();
-          if (blockIdPair == null) {
-            LOG.info("LazyWriter has no blocks to persist. " +
-                "Thread going to sleep.");
+          replicaState = lazyWriteReplicaTracker.dequeueNextReplicaToPersist();
+          if (replicaState == null) {
             return false;
           }
         }
 
         // Move the replica outside the lock.
-        moveReplicaToNewVolume(blockIdPair.bpid, blockIdPair.blockId);
-
+        moveReplicaToNewVolume(replicaState.bpid, replicaState.blockId);
+        succeeded = true;
       } catch(IOException ioe) {
-        // If we failed, put the block on the queue and let a retry
-        // interval elapse before we try again so we don't try to keep
-        // checkpointing the same block in a tight loop.
-        synchronized (this) {
-          blocksPendingCheckpoint.add(blockIdPair);
-          ++moreWorkThreshold;
+        LOG.warn("Exception saving replica " + replicaState, ioe);
+      } finally {
+        if (!succeeded && replicaState != null) {
+          lazyWriteReplicaTracker.reenqueueReplica(replicaState);
         }
       }
 
-      synchronized (this) {
-        return blocksPendingCheckpoint.size() > moreWorkThreshold;
-      }
+      return succeeded;
     }
 
     @Override
     public void run() {
+      int numSuccessiveFailures = 0;
+
       while (fsRunning && shouldRun) {
         try {
-          if (!saveNextReplica()) {
+          numSuccessiveFailures = saveNextReplica() ? 0 : (numSuccessiveFailures + 1);
+
+          // Sleep if we have no more work to do or if it looks like we are not
+          // making any forward progress. This is to ensure that if all persist
+          // operations are failing we don't keep retrying them in a tight loop.
+          if (numSuccessiveFailures == lazyWriteReplicaTracker.numReplicasNotPersisted()) {
             Thread.sleep(checkpointerInterval * 1000);
+            numSuccessiveFailures = 0;
           }
         } catch (InterruptedException e) {
           LOG.info("LazyWriter was interrupted, exiting");

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4cf9afac/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
index d3c585d..85756b7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
@@ -255,22 +255,6 @@ public class FsVolumeImpl implements FsVolumeSpi {
     getBlockPoolSlice(bpid).getVolumeMap(volumeMap);
   }
   
-  /**
-   * Add replicas under the given directory to the volume map
-   * @param volumeMap the replicas map
-   * @param dir an input directory
-   * @param isFinalized true if the directory has finalized replicas;
-   *                    false if the directory has rbw replicas
-   * @throws IOException 
-   */
-  void addToReplicasMap(String bpid, ReplicaMap volumeMap, 
-      File dir, boolean isFinalized) throws IOException {
-    BlockPoolSlice bp = getBlockPoolSlice(bpid);
-    // TODO move this up
-    // dfsUsage.incDfsUsed(b.getNumBytes()+metaFile.length());
-    bp.addToReplicasMap(volumeMap, dir, isFinalized);
-  }
-
   @Override
   public String toString() {
     return currentDir.getAbsolutePath();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4cf9afac/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyWriteReplicaTracker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyWriteReplicaTracker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyWriteReplicaTracker.java
index ae28f09..222b63a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyWriteReplicaTracker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyWriteReplicaTracker.java
@@ -19,12 +19,11 @@
 package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
 
 
-import com.google.common.collect.Multimap;
 import com.google.common.collect.TreeMultimap;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
 
 import java.io.File;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.*;
 
 class LazyWriteReplicaTracker {
 
@@ -43,7 +42,7 @@ class LazyWriteReplicaTracker {
     /**
      * transient storage volume that holds the original replica.
      */
-    final FsVolumeImpl transientVolume;
+    final FsVolumeSpi transientVolume;
 
     /**
      * Persistent volume that holds or will hold the saved replica.
@@ -51,7 +50,7 @@ class LazyWriteReplicaTracker {
     FsVolumeImpl lazyPersistVolume;
     File savedBlockFile;
 
-    ReplicaState(final String bpid, final long blockId, FsVolumeImpl transientVolume) {
+    ReplicaState(final String bpid, final long blockId, FsVolumeSpi transientVolume) {
       this.bpid = bpid;
       this.blockId = blockId;
       this.transientVolume = transientVolume;
@@ -61,6 +60,11 @@ class LazyWriteReplicaTracker {
     }
 
     @Override
+    public String toString() {
+      return "[Bpid=" + bpid + ";blockId=" + blockId + "]";
+    }
+
+    @Override
     public int hashCode() {
       return bpid.hashCode() ^ (int) blockId;
     }
@@ -99,35 +103,43 @@ class LazyWriteReplicaTracker {
   final Map<String, Map<Long, ReplicaState>> replicaMaps;
 
   /**
+   * Queue of replicas that need to be written to disk.
+   */
+  final Queue<ReplicaState> replicasNotPersisted;
+
+  /**
    * A map of blockId to persist complete time for transient blocks. This allows
    * us to evict LRU blocks from transient storage. Protected by 'this'
    * Object lock.
    */
-  final Map<ReplicaState, Long> persistTimeMap;
+  final Map<ReplicaState, Long> replicasPersisted;
 
   LazyWriteReplicaTracker(final FsDatasetImpl fsDataset) {
     this.fsDataset = fsDataset;
     replicaMaps = new HashMap<String, Map<Long, ReplicaState>>();
-    persistTimeMap = new HashMap<ReplicaState, Long>();
+    replicasNotPersisted = new LinkedList<ReplicaState>();
+    replicasPersisted = new HashMap<ReplicaState, Long>();
   }
 
   TreeMultimap<Long, ReplicaState> getLruMap() {
     // TODO: This can be made more efficient.
     TreeMultimap<Long, ReplicaState> reversedMap = TreeMultimap.create();
-    for (Map.Entry<ReplicaState, Long> entry : persistTimeMap.entrySet()) {
+    for (Map.Entry<ReplicaState, Long> entry : replicasPersisted.entrySet()) {
       reversedMap.put(entry.getValue(), entry.getKey());
     }
     return reversedMap;
   }
 
   synchronized void addReplica(String bpid, long blockId,
-                               final FsVolumeImpl transientVolume) {
+                               final FsVolumeSpi transientVolume) {
     Map<Long, ReplicaState> map = replicaMaps.get(bpid);
     if (map == null) {
       map = new HashMap<Long, ReplicaState>();
       replicaMaps.put(bpid, map);
     }
-    map.put(blockId, new ReplicaState(bpid, blockId, transientVolume));
+    ReplicaState replicaState = new ReplicaState(bpid, blockId, transientVolume);
+    map.put(blockId, replicaState);
+    replicasNotPersisted.add(replicaState);
   }
 
   synchronized void recordStartLazyPersist(
@@ -149,12 +161,49 @@ class LazyWriteReplicaTracker {
     }
     replicaState.state = State.LAZY_PERSIST_COMPLETE;
     replicaState.savedBlockFile = savedBlockFile;
-    persistTimeMap.put(replicaState, System.currentTimeMillis() / 1000);
+
+    if (replicasNotPersisted.peek() == replicaState) {
+      // Common case.
+      replicasNotPersisted.remove();
+    } else {
+      // Should never occur in practice as lazy writer always persists
+      // the replica at the head of the queue before moving to the next
+      // one.
+      replicasNotPersisted.remove(replicaState);
+    }
+    replicasPersisted.put(replicaState, System.currentTimeMillis() / 1000);
+  }
+
+  synchronized ReplicaState dequeueNextReplicaToPersist() {
+    while (replicasNotPersisted.size() != 0) {
+      ReplicaState replicaState = replicasNotPersisted.remove();
+      Map<Long, ReplicaState> replicaMap = replicaMaps.get(replicaState.bpid);
+
+      if (replicaMap != null && replicaMap.get(replicaState.blockId) != null) {
+        return replicaState;
+      }
+
+      // The replica no longer exists, look for the next one.
+    }
+    return null;
+  }
+
+  synchronized void reenqueueReplica(final ReplicaState replicaState) {
+    replicasNotPersisted.add(replicaState);
+  }
+
+  synchronized int numReplicasNotPersisted() {
+    return replicasNotPersisted.size();
   }
 
   synchronized void discardReplica(
       final String bpid, final long blockId, boolean force) {
     Map<Long, ReplicaState> map = replicaMaps.get(bpid);
+
+    if (map == null) {
+      return;
+    }
+
     ReplicaState replicaState = map.get(blockId);
 
     if (replicaState == null) {
@@ -172,6 +221,9 @@ class LazyWriteReplicaTracker {
     }
 
     map.remove(blockId);
-    persistTimeMap.remove(replicaState);
+    replicasPersisted.remove(replicaState);
+
+    // Leave the replica in replicasNotPersisted if its present.
+    // dequeueNextReplicaToPersist will GC it eventually.
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4cf9afac/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java
index ddd71b1..af0e8ac 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java
@@ -40,7 +40,9 @@ import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.StorageType;
 import org.apache.hadoop.hdfs.protocol.*;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
+import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.test.GenericTestUtils;
@@ -61,6 +63,7 @@ public class TestLazyPersistFiles {
   static {
     ((Log4JLogger) NameNode.blockStateChangeLog).getLogger().setLevel(Level.ALL);
     ((Log4JLogger) NameNode.stateChangeLog).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger) FsDatasetImpl.LOG).getLogger().setLevel(Level.ALL);
   }
 
   private static short REPL_FACTOR = 1;
@@ -68,7 +71,7 @@ public class TestLazyPersistFiles {
   private static final int LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC = 3;
   private static final long HEARTBEAT_INTERVAL_SEC = 1;
   private static final int HEARTBEAT_RECHECK_INTERVAL_MSEC = 500;
-  private static final int LAZY_WRITER_INTERVAL_SEC = 3;
+  private static final int LAZY_WRITER_INTERVAL_SEC = 1;
   private static final int BUFFER_LENGTH = 4096;
 
   private MiniDFSCluster cluster;
@@ -283,8 +286,9 @@ public class TestLazyPersistFiles {
       File lazyPersistDir = volume.getBlockPoolSlice(bpid).getLazypersistDir();
 
       for (LocatedBlock lb : locatedBlocks.getLocatedBlocks()) {
-        File persistedBlockFile = new File(lazyPersistDir, "blk_" + lb.getBlock().getBlockId());
-        if (persistedBlockFile.exists()) {
+        File targetDir = DatanodeUtil.idToBlockDir(lazyPersistDir, lb.getBlock().getBlockId());
+        File blockFile = new File(targetDir, lb.getBlock().getBlockName());
+        if (blockFile.exists()) {
           // Found a persisted copy for this block!
           boolean added = persistedBlockIds.add(lb.getBlock().getBlockId());
           assertThat(added, is(true));


[2/2] git commit: HDFS-6931. Move lazily persisted replicas to finalized directory on DN startup. (Arpit Agarwal)

Posted by ar...@apache.org.
HDFS-6931. Move lazily persisted replicas to finalized directory on DN startup. (Arpit Agarwal)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c92837ae
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c92837ae
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c92837ae

Branch: refs/heads/HDFS-6581
Commit: c92837aeab5188f6171d4016f91b3b4936a66beb
Parents: 4cf9afa
Author: arp <ar...@apache.org>
Authored: Thu Aug 28 23:13:46 2014 -0700
Committer: arp <ar...@apache.org>
Committed: Thu Aug 28 23:13:46 2014 -0700

----------------------------------------------------------------------
 .../hadoop-hdfs/CHANGES-HDFS-6581.txt           |   3 +
 .../datanode/fsdataset/impl/BlockPoolSlice.java | 168 ++++++++++++++++---
 .../datanode/fsdataset/impl/FsDatasetImpl.java  |   9 +-
 .../datanode/fsdataset/impl/FsVolumeImpl.java   |  12 +-
 .../datanode/fsdataset/impl/FsVolumeList.java   |  18 +-
 .../fsdataset/impl/TestLazyPersistFiles.java    |  53 +++++-
 6 files changed, 213 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/c92837ae/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-6581.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-6581.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-6581.txt
index 881cb63..8791485 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-6581.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-6581.txt
@@ -27,3 +27,6 @@
     HDFS-6960. Bugfix in LazyWriter, fix test case and some refactoring.
     (Arpit Agarwal)
 
+    HDFS-6931. Move lazily persisted replicas to finalized directory on DN
+    startup. (Arpit Agarwal)
+

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c92837ae/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
index 1313fef..1bb6680 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
@@ -94,17 +94,6 @@ class BlockPoolSlice {
       }
     }
 
-    // Delete all checkpointed replicas on startup.
-    // TODO: We can move checkpointed replicas to the finalized dir and delete
-    //       the copy on RAM_DISK. For now we take the simpler approach.
-
-    FileUtil.fullyDelete(lazypersistDir);
-    if (!this.lazypersistDir.exists()) {
-      if (!this.lazypersistDir.mkdirs()) {
-        throw new IOException("Failed to mkdirs " + this.lazypersistDir);
-      }
-    }
-
     // Files that were being written when the datanode was last shutdown
     // are now moved back to the data directory. It is possible that
     // in the future, we might want to do some sort of datanode-local
@@ -271,6 +260,13 @@ class BlockPoolSlice {
     return blockFile;
   }
 
+  /**
+   * Save the given replica to persistent storage.
+   *
+   * @param replicaInfo
+   * @return The saved block file.
+   * @throws IOException
+   */
   File lazyPersistReplica(ReplicaInfo replicaInfo) throws IOException {
     if (!lazypersistDir.exists() && !lazypersistDir.mkdirs()) {
       FsDatasetImpl.LOG.warn("Failed to create " + lazypersistDir);
@@ -305,11 +301,21 @@ class BlockPoolSlice {
 
 
     
-  void getVolumeMap(ReplicaMap volumeMap) throws IOException {
+  void getVolumeMap(ReplicaMap volumeMap,
+                    final LazyWriteReplicaTracker lazyWriteReplicaMap)
+      throws IOException {
+    // Recover lazy persist replicas, they will be added to the volumeMap
+    // when we scan the finalized directory.
+    if (lazypersistDir.exists()) {
+      int numRecovered = moveLazyPersistReplicasToFinalized(lazypersistDir);
+      FsDatasetImpl.LOG.info(
+          "Recovered " + numRecovered + " replicas from " + lazypersistDir);
+    }
+
     // add finalized replicas
-    addToReplicasMap(volumeMap, finalizedDir, true);
+    addToReplicasMap(volumeMap, finalizedDir, lazyWriteReplicaMap, true);
     // add rbw replicas
-    addToReplicasMap(volumeMap, rbwDir, false);
+    addToReplicasMap(volumeMap, rbwDir, lazyWriteReplicaMap, false);
   }
 
   /**
@@ -338,18 +344,68 @@ class BlockPoolSlice {
 
 
   /**
+   * Move replicas in the lazy persist directory to their corresponding locations
+   * in the finalized directory.
+   * @return number of replicas recovered.
+   */
+  private int moveLazyPersistReplicasToFinalized(File source)
+      throws IOException {
+    File files[] = FileUtil.listFiles(source);
+    int numRecovered = 0;
+    for (File file : files) {
+      if (file.isDirectory()) {
+        numRecovered += moveLazyPersistReplicasToFinalized(file);
+      }
+
+      if (Block.isMetaFilename(file.getName())) {
+        File metaFile = file;
+        File blockFile = Block.metaToBlockFile(metaFile);
+        long blockId = Block.filename2id(blockFile.getName());
+        File targetDir = DatanodeUtil.idToBlockDir(finalizedDir, blockId);
+
+        if (blockFile.exists()) {
+          File targetBlockFile = new File(targetDir, blockFile.getName());
+          File targetMetaFile = new File(targetDir, metaFile.getName());
+
+          if (!targetDir.exists() && !targetDir.mkdirs()) {
+            FsDatasetImpl.LOG.warn("Failed to move " + blockFile + " to " + targetDir);
+            continue;
+          }
+
+          metaFile.renameTo(targetMetaFile);
+          blockFile.renameTo(targetBlockFile);
+
+          if (targetBlockFile.exists() && targetMetaFile.exists()) {
+            ++numRecovered;
+          } else {
+            // Failure should be rare.
+            FsDatasetImpl.LOG.warn("Failed to move " + blockFile + " to " + targetDir);
+          }
+        }
+      }
+    }
+
+    FileUtil.fullyDelete(source);
+    return numRecovered;
+  }
+
+  /**
    * Add replicas under the given directory to the volume map
    * @param volumeMap the replicas map
    * @param dir an input directory
+   * @param lazyWriteReplicaMap Map of replicas on transient
+   *                                storage.
    * @param isFinalized true if the directory has finalized replicas;
    *                    false if the directory has rbw replicas
    */
-  void addToReplicasMap(ReplicaMap volumeMap, File dir, boolean isFinalized
-      ) throws IOException {
+  void addToReplicasMap(ReplicaMap volumeMap, File dir,
+                        final LazyWriteReplicaTracker lazyWriteReplicaMap,
+                        boolean isFinalized)
+      throws IOException {
     File files[] = FileUtil.listFiles(dir);
     for (File file : files) {
       if (file.isDirectory()) {
-        addToReplicasMap(volumeMap, file, isFinalized);
+        addToReplicasMap(volumeMap, file, lazyWriteReplicaMap, isFinalized);
       }
 
       if (isFinalized && FsDatasetUtil.isUnlinkTmpFile(file)) {
@@ -405,13 +461,83 @@ class BlockPoolSlice {
         }
       }
 
-      ReplicaInfo oldReplica = volumeMap.add(bpid, newReplica);
-      if (oldReplica != null) {
-        FsDatasetImpl.LOG.warn("Two block files with the same block id exist " +
-            "on disk: " + oldReplica.getBlockFile() + " and " + file );
+      ReplicaInfo oldReplica = volumeMap.get(bpid, newReplica.getBlockId());
+      if (oldReplica == null) {
+        volumeMap.add(bpid, newReplica);
+      } else {
+        // We have multiple replicas of the same block so decide which one
+        // to keep.
+        newReplica = resolveDuplicateReplicas(newReplica, oldReplica, volumeMap);
+      }
+
+      // If we are retaining a replica on transient storage make sure
+      // it is in the lazyWriteReplicaMap so it can be persisted
+      // eventually.
+      if (newReplica.getVolume().isTransientStorage()) {
+        lazyWriteReplicaMap.addReplica(bpid, blockId, newReplica.getVolume());
+      } else {
+        lazyWriteReplicaMap.discardReplica(bpid, blockId, true);
       }
     }
   }
+
+  /**
+   * This method is invoked during DN startup when volumes are scanned to
+   * build up the volumeMap.
+   *
+   * Given two replicas, decide which one to keep. The preference is as
+   * follows:
+   *   1. Prefer the replica with the higher generation stamp.
+   *   2. If generation stamps are equal, prefer the replica with the
+   *      larger on-disk length.
+   *   3. If on-disk length is the same, prefer the replica on persistent
+   *      storage volume.
+   *   4. All other factors being equal, keep replica1.
+   *
+   * The other replica is removed from the volumeMap and is deleted from
+   * its storage volume.
+   *
+   * @param replica1
+   * @param replica2
+   * @param volumeMap
+   * @return the replica that is retained.
+   * @throws IOException
+   */
+  private ReplicaInfo resolveDuplicateReplicas(
+      final ReplicaInfo replica1, final ReplicaInfo replica2,
+      final ReplicaMap volumeMap) throws IOException {
+
+    ReplicaInfo replicaToKeep;
+    ReplicaInfo replicaToDelete;
+
+    if (replica1.getGenerationStamp() != replica2.getGenerationStamp()) {
+      replicaToKeep = replica1.getGenerationStamp() > replica2.getGenerationStamp()
+          ? replica1 : replica2;
+    } else if (replica1.getNumBytes() != replica2.getNumBytes()) {
+      replicaToKeep = replica1.getNumBytes() > replica2.getNumBytes() ?
+          replica1 : replica2;
+    } else if (replica1.getVolume().isTransientStorage() &&
+               !replica2.getVolume().isTransientStorage()) {
+      replicaToKeep = replica2;
+    } else {
+      replicaToKeep = replica1;
+    }
+
+    replicaToDelete = (replicaToKeep == replica1) ? replica2 : replica1;
+
+    // Update volumeMap.
+    volumeMap.add(bpid, replicaToKeep);
+
+    // Delete the files on disk. Failure here is okay.
+    replicaToDelete.getBlockFile().delete();
+    replicaToDelete.getMetaFile().delete();
+
+    FsDatasetImpl.LOG.info(
+        "resolveDuplicateReplicas keeping " + replicaToKeep.getBlockFile() +
+        ", deleting " + replicaToDelete.getBlockFile());
+
+    return replicaToKeep;
+  }
   
   /**
    * Find out the number of bytes in the block that match its crc.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c92837ae/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index 22f626c..10d98ad 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -262,13 +262,13 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     volumes = new FsVolumeList(volsFailed, blockChooserImpl);
     asyncDiskService = new FsDatasetAsyncDiskService(datanode);
 
-    // TODO: Initialize transientReplicaTracker from blocks on disk.
-
     for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) {
       addVolume(dataLocations, storage.getStorageDir(idx));
     }
 
     cacheManager = new FsDatasetCache(this);
+
+    // Start the lazy writer once we have built the replica maps.
     lazyWriter = new Daemon(new LazyWriter(
         conf.getInt(DFSConfigKeys.DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC,
                     DFSConfigKeys.DFS_DATANODE_LAZY_WRITER_INTERVAL_DEFAULT_SEC)));
@@ -287,7 +287,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     // storageMap and asyncDiskService, consistent.
     FsVolumeImpl fsVolume = FsVolumeImplAllocator.createVolume(
         this, sd.getStorageUuid(), dir, this.conf, storageType);
-    fsVolume.getVolumeMap(volumeMap);
+    fsVolume.getVolumeMap(volumeMap, lazyWriteReplicaTracker);
 
     volumes.addVolume(fsVolume);
     storageMap.put(sd.getStorageUuid(),
@@ -2021,7 +2021,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       volumes.addBlockPool(bpid, conf);
       volumeMap.initBlockPool(bpid);
     }
-    volumes.getAllVolumesMap(bpid, volumeMap);
+    volumes.getAllVolumesMap(bpid, volumeMap, lazyWriteReplicaTracker);
   }
 
   @Override
@@ -2261,6 +2261,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
         LOG.warn("Exception saving replica " + replicaState, ioe);
       } finally {
         if (!succeeded && replicaState != null) {
+          LOG.warn("Failed to save replica " + replicaState + ". re-enqueueing it.");
           lazyWriteReplicaTracker.reenqueueReplica(replicaState);
         }
       }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c92837ae/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
index 85756b7..ccfb449 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
@@ -245,14 +245,18 @@ public class FsVolumeImpl implements FsVolumeSpi {
     }
   }
     
-  void getVolumeMap(ReplicaMap volumeMap) throws IOException {
+  void getVolumeMap(ReplicaMap volumeMap,
+                    final LazyWriteReplicaTracker lazyWriteReplicaMap)
+      throws IOException {
     for(BlockPoolSlice s : bpSlices.values()) {
-      s.getVolumeMap(volumeMap);
+      s.getVolumeMap(volumeMap, lazyWriteReplicaMap);
     }
   }
   
-  void getVolumeMap(String bpid, ReplicaMap volumeMap) throws IOException {
-    getBlockPoolSlice(bpid).getVolumeMap(volumeMap);
+  void getVolumeMap(String bpid, ReplicaMap volumeMap,
+                    final LazyWriteReplicaTracker lazyWriteReplicaMap)
+      throws IOException {
+    getBlockPoolSlice(bpid).getVolumeMap(volumeMap, lazyWriteReplicaMap);
   }
   
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c92837ae/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
index f1b196a..67958bf 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
@@ -119,7 +119,10 @@ class FsVolumeList {
     return remaining;
   }
   
-  void getAllVolumesMap(final String bpid, final ReplicaMap volumeMap) throws IOException {
+  void getAllVolumesMap(final String bpid,
+                        final ReplicaMap volumeMap,
+                        final LazyWriteReplicaTracker lazyWriteReplicaMap)
+      throws IOException {
     long totalStartTime = Time.monotonicNow();
     final List<IOException> exceptions = Collections.synchronizedList(
         new ArrayList<IOException>());
@@ -131,7 +134,7 @@ class FsVolumeList {
             FsDatasetImpl.LOG.info("Adding replicas to map for block pool " +
                 bpid + " on volume " + v + "...");
             long startTime = Time.monotonicNow();
-            v.getVolumeMap(bpid, volumeMap);
+            v.getVolumeMap(bpid, volumeMap, lazyWriteReplicaMap);
             long timeTaken = Time.monotonicNow() - startTime;
             FsDatasetImpl.LOG.info("Time to add replicas to map for block pool"
                 + " " + bpid + " on volume " + v + ": " + timeTaken + "ms");
@@ -160,17 +163,6 @@ class FsVolumeList {
         + totalTimeTaken + "ms");
   }
 
-  void getVolumeMap(String bpid, FsVolumeImpl volume, ReplicaMap volumeMap)
-      throws IOException {
-    FsDatasetImpl.LOG.info("Adding replicas to map for block pool " + bpid +
-                               " on volume " + volume + "...");
-    long startTime = Time.monotonicNow();
-    volume.getVolumeMap(bpid, volumeMap);
-    long timeTaken = Time.monotonicNow() - startTime;
-    FsDatasetImpl.LOG.info("Time to add replicas to map for block pool " + bpid +
-                               " on volume " + volume + ": " + timeTaken + "ms");
-  }
-    
   /**
    * Calls {@link FsVolumeImpl#checkDirs()} on each volume, removing any
    * volumes from the active list that result in a DiskErrorException.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c92837ae/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java
index af0e8ac..cac99a7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java
@@ -265,7 +265,9 @@ public class TestLazyPersistFiles {
     LocatedBlocks locatedBlocks = ensureFileReplicasOnStorageType(path, RAM_DISK);
 
     // Sleep for a short time to allow the lazy writer thread to do its job
-    Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);
+    Thread.sleep(6 * LAZY_WRITER_INTERVAL_SEC * 1000);
+    
+    LOG.info("Verifying copy was saved to lazyPersist/");
 
     // Make sure that there is a saved copy of the replica on persistent
     // storage.
@@ -330,23 +332,52 @@ public class TestLazyPersistFiles {
     ensureFileReplicasOnStorageType(path1, DEFAULT);
   }
 
-  /**
-   * TODO: Stub test, to be completed.
-   * Verify that checksum computation is skipped for files written to memory.
-   */
   @Test (timeout=300000)
-  public void testChecksumIsSkipped()
+  public void testDnRestartWithSavedReplicas()
       throws IOException, InterruptedException {
+
     startUpCluster(REPL_FACTOR,
-                   new StorageType[] {RAM_DISK, DEFAULT }, -1);
+        new StorageType[] {RAM_DISK, DEFAULT },
+        (2 * BLOCK_SIZE - 1));     // 1 replica + delta.
     final String METHOD_NAME = GenericTestUtils.getMethodName();
     Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
 
     makeTestFile(path1, BLOCK_SIZE, true);
     ensureFileReplicasOnStorageType(path1, RAM_DISK);
 
-    // Verify checksum was not computed.
+    // Sleep for a short time to allow the lazy writer thread to do its job.
+    // However the block replica should not be evicted from RAM_DISK yet.
+    Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);
+    ensureFileReplicasOnStorageType(path1, RAM_DISK);
 
+    LOG.info("Restarting the DataNode");
+    cluster.restartDataNode(0, true);
+    cluster.waitActive();
+
+    // Ensure that the replica is now on persistent storage.
+    ensureFileReplicasOnStorageType(path1, DEFAULT);
+  }
+
+  @Test (timeout=300000)
+  public void testDnRestartWithUnsavedReplicas()
+      throws IOException, InterruptedException {
+
+    startUpCluster(REPL_FACTOR,
+                   new StorageType[] {RAM_DISK, DEFAULT },
+                   (2 * BLOCK_SIZE - 1));     // 1 replica + delta.
+    stopLazyWriter(cluster.getDataNodes().get(0));
+
+    final String METHOD_NAME = GenericTestUtils.getMethodName();
+    Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
+    makeTestFile(path1, BLOCK_SIZE, true);
+    ensureFileReplicasOnStorageType(path1, RAM_DISK);
+
+    LOG.info("Restarting the DataNode");
+    cluster.restartDataNode(0, true);
+    cluster.waitActive();
+
+    // Ensure that the replica is still on transient storage.
+    ensureFileReplicasOnStorageType(path1, RAM_DISK);
   }
 
   // ---- Utility functions for all test cases -------------------------------
@@ -443,4 +474,10 @@ public class TestLazyPersistFiles {
 
     return locatedBlocks;
   }
+
+  private void stopLazyWriter(DataNode dn) {
+    // Stop the lazyWriter daemon.
+    FsDatasetImpl fsDataset = ((FsDatasetImpl) dn.getFSDataset());
+    ((FsDatasetImpl.LazyWriter) fsDataset.lazyWriter.getRunnable()).stop();
+  }
 }