You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ar...@apache.org on 2014/08/29 08:15:35 UTC
[1/2] git commit: HDFS-6960. Bugfix in LazyWriter,
fix test case and some refactoring. (Arpit Agarwal)
Repository: hadoop
Updated Branches:
refs/heads/HDFS-6581 7e32be876 -> c92837aea
HDFS-6960. Bugfix in LazyWriter, fix test case and some refactoring. (Arpit Agarwal)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/4cf9afac
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/4cf9afac
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/4cf9afac
Branch: refs/heads/HDFS-6581
Commit: 4cf9afacbe3d0814fb616d238aa9b16b1ae68386
Parents: 7e32be8
Author: arp <ar...@apache.org>
Authored: Thu Aug 28 23:05:32 2014 -0700
Committer: arp <ar...@apache.org>
Committed: Thu Aug 28 23:05:32 2014 -0700
----------------------------------------------------------------------
.../hadoop-hdfs/CHANGES-HDFS-6581.txt | 3 +
.../datanode/fsdataset/impl/BlockPoolSlice.java | 11 +-
.../datanode/fsdataset/impl/FsDatasetImpl.java | 129 +++++++++----------
.../datanode/fsdataset/impl/FsVolumeImpl.java | 16 ---
.../fsdataset/impl/LazyWriteReplicaTracker.java | 76 +++++++++--
.../fsdataset/impl/TestLazyPersistFiles.java | 10 +-
6 files changed, 141 insertions(+), 104 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4cf9afac/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-6581.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-6581.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-6581.txt
index b0fb070..881cb63 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-6581.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-6581.txt
@@ -24,3 +24,6 @@
HDFS-6928. 'hdfs put' command should accept lazyPersist flag for testing.
(Arpit Agarwal)
+ HDFS-6960. Bugfix in LazyWriter, fix test case and some refactoring.
+ (Arpit Agarwal)
+
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4cf9afac/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
index 31a254b..1313fef 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
@@ -271,10 +271,13 @@ class BlockPoolSlice {
return blockFile;
}
- File lazyPersistReplica(Block b, File f) throws IOException {
- File blockFile = FsDatasetImpl.copyBlockFiles(b, f, lazypersistDir);
- File metaFile = FsDatasetUtil.getMetaFile(blockFile, b.getGenerationStamp());
- dfsUsage.incDfsUsed(b.getNumBytes() + metaFile.length());
+ File lazyPersistReplica(ReplicaInfo replicaInfo) throws IOException {
+ if (!lazypersistDir.exists() && !lazypersistDir.mkdirs()) {
+ FsDatasetImpl.LOG.warn("Failed to create " + lazypersistDir);
+ }
+ File metaFile = FsDatasetImpl.copyBlockFiles(replicaInfo, lazypersistDir);
+ File blockFile = Block.metaToBlockFile(metaFile);
+ dfsUsage.incDfsUsed(replicaInfo.getNumBytes() + metaFile.length());
return blockFile;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4cf9afac/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index 8643d6b..22f626c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -58,6 +58,7 @@ import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
import org.apache.hadoop.hdfs.server.datanode.DataBlockScanner;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataStorage;
+import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
import org.apache.hadoop.hdfs.server.datanode.Replica;
import org.apache.hadoop.hdfs.server.datanode.ReplicaAlreadyExistsException;
@@ -565,28 +566,33 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
return dstfile;
}
- static File copyBlockFiles(Block b, File srcfile, File destdir)
+ /**
+ * Copy the block and meta files for the given block from the given
+ * @return the new meta file.
+ * @throws IOException
+ */
+ static File copyBlockFiles(ReplicaInfo replicaInfo, File destRoot)
throws IOException {
- final File dstfile = new File(destdir, b.getBlockName());
- final File srcmeta = FsDatasetUtil.getMetaFile(srcfile, b.getGenerationStamp());
- final File dstmeta = FsDatasetUtil.getMetaFile(dstfile, b.getGenerationStamp());
+ final File destDir = DatanodeUtil.idToBlockDir(destRoot, replicaInfo.getBlockId());
+ final File dstFile = new File(destDir, replicaInfo.getBlockName());
+ final File dstMeta = FsDatasetUtil.getMetaFile(dstFile, replicaInfo.getGenerationStamp());
+ final File srcMeta = replicaInfo.getMetaFile();
+ final File srcFile = replicaInfo.getBlockFile();
try {
- FileUtils.copyFile(srcmeta, dstmeta);
+ FileUtils.copyFile(srcMeta, dstMeta);
} catch (IOException e) {
- throw new IOException("Failed to copy meta file for " + b
- + " from " + srcmeta + " to " + dstmeta, e);
+ throw new IOException("Failed to copy " + srcMeta + " to " + dstMeta, e);
}
try {
- FileUtils.copyFile(srcfile, dstfile);
+ FileUtils.copyFile(srcFile, dstFile);
} catch (IOException e) {
- throw new IOException("Failed to copy block file for " + b
- + " from " + srcfile + " to " + dstfile.getAbsolutePath(), e);
+ throw new IOException("Failed to copy " + srcFile + " to " + dstFile, e);
}
if (LOG.isDebugEnabled()) {
- LOG.debug("addBlock: Moved " + srcmeta + " to " + dstmeta
- + " and " + srcfile + " to " + dstfile);
+ LOG.debug("addBlock: Moved " + srcMeta + " to " + dstMeta);
+ LOG.debug("addBlock: Moved " + srcFile + " to " + dstFile);
}
- return dstfile;
+ return dstMeta;
}
static private void truncateBlock(File blockFile, File metaFile,
@@ -1174,10 +1180,6 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
if (v.isTransientStorage()) {
lazyWriteReplicaTracker.addReplica(bpid, replicaInfo.getBlockId(), v);
-
- // Schedule a checkpoint.
- ((LazyWriter) lazyWriter.getRunnable())
- .addReplicaToLazyWriteQueue(bpid, replicaInfo.getBlockId());
}
}
volumeMap.add(bpid, newReplicaInfo);
@@ -2188,32 +2190,12 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
nbytes, flags);
}
- private static class BlockIdPair {
- final String bpid;
- final long blockId;
-
- BlockIdPair(final String bpid, final long blockId) {
- this.bpid = bpid;
- this.blockId = blockId;
- }
- }
-
- private class LazyWriter implements Runnable {
+ class LazyWriter implements Runnable {
private volatile boolean shouldRun = true;
final int checkpointerInterval;
- final private Queue<BlockIdPair> blocksPendingCheckpoint;
-
public LazyWriter(final int checkpointerInterval) {
this.checkpointerInterval = checkpointerInterval;
- blocksPendingCheckpoint = new LinkedList<BlockIdPair>();
- }
-
- // Schedule a replica for writing to persistent storage.
- public synchronized void addReplicaToLazyWriteQueue(
- String bpid, long blockId) {
- LOG.info("Block with blockId=" + blockId + "; bpid=" + bpid + " added to lazy writer queue");
- blocksPendingCheckpoint.add(new BlockIdPair(bpid, blockId));
}
private void moveReplicaToNewVolume(String bpid, long blockId)
@@ -2221,76 +2203,85 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
LOG.info("LazyWriter invoked to save blockId=" + blockId + "; bpid=" + bpid);
- FsVolumeImpl targetVolume = null;
- Block block = null;
- File blockFile = null;
+ FsVolumeImpl targetVolume;
+ ReplicaInfo replicaInfo;
synchronized (this) {
- block = getStoredBlock(bpid, blockId);
- blockFile = getFile(bpid, blockId);
+ replicaInfo = volumeMap.get(bpid, blockId);
- if (block == null) {
- // The block was deleted before it could be checkpointed.
+ if (replicaInfo == null || !replicaInfo.getVolume().isTransientStorage()) {
+ // The block was either deleted before it could be checkpointed or
+ // it is already on persistent storage. This can occur if a second
+ // replica on persistent storage was found after the lazy write was
+ // scheduled.
return;
}
// Pick a target volume for the block.
targetVolume = volumes.getNextVolume(
- StorageType.DEFAULT, block.getNumBytes());
+ StorageType.DEFAULT, replicaInfo.getNumBytes());
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("LazyWriter starting to save blockId=" + blockId + "; bpid=" + bpid);
}
- LOG.info("LazyWriter starting to save blockId=" + blockId + "; bpid=" + bpid);
lazyWriteReplicaTracker.recordStartLazyPersist(bpid, blockId, targetVolume);
File savedBlockFile = targetVolume.getBlockPoolSlice(bpid)
- .lazyPersistReplica(block, blockFile);
+ .lazyPersistReplica(replicaInfo);
lazyWriteReplicaTracker.recordEndLazyPersist(bpid, blockId, savedBlockFile);
- LOG.info("LazyWriter finished saving blockId=" + blockId + "; bpid=" + bpid +
- " to file " + savedBlockFile);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("LazyWriter finished saving blockId=" + blockId + "; bpid=" + bpid +
+ " to file " + savedBlockFile);
+ }
}
/**
* Checkpoint a pending replica to persistent storage now.
+ * If we fail then move the replica to the end of the queue.
* @return true if there is more work to be done, false otherwise.
*/
private boolean saveNextReplica() {
- BlockIdPair blockIdPair = null;
- int moreWorkThreshold = 0;
+ LazyWriteReplicaTracker.ReplicaState replicaState = null;
+ boolean succeeded = false;
try {
synchronized (this) {
- // Dequeue the next replica waiting to be checkpointed.
- blockIdPair = blocksPendingCheckpoint.poll();
- if (blockIdPair == null) {
- LOG.info("LazyWriter has no blocks to persist. " +
- "Thread going to sleep.");
+ replicaState = lazyWriteReplicaTracker.dequeueNextReplicaToPersist();
+ if (replicaState == null) {
return false;
}
}
// Move the replica outside the lock.
- moveReplicaToNewVolume(blockIdPair.bpid, blockIdPair.blockId);
-
+ moveReplicaToNewVolume(replicaState.bpid, replicaState.blockId);
+ succeeded = true;
} catch(IOException ioe) {
- // If we failed, put the block on the queue and let a retry
- // interval elapse before we try again so we don't try to keep
- // checkpointing the same block in a tight loop.
- synchronized (this) {
- blocksPendingCheckpoint.add(blockIdPair);
- ++moreWorkThreshold;
+ LOG.warn("Exception saving replica " + replicaState, ioe);
+ } finally {
+ if (!succeeded && replicaState != null) {
+ lazyWriteReplicaTracker.reenqueueReplica(replicaState);
}
}
- synchronized (this) {
- return blocksPendingCheckpoint.size() > moreWorkThreshold;
- }
+ return succeeded;
}
@Override
public void run() {
+ int numSuccessiveFailures = 0;
+
while (fsRunning && shouldRun) {
try {
- if (!saveNextReplica()) {
+ numSuccessiveFailures = saveNextReplica() ? 0 : (numSuccessiveFailures + 1);
+
+ // Sleep if we have no more work to do or if it looks like we are not
+ // making any forward progress. This is to ensure that if all persist
+ // operations are failing we don't keep retrying them in a tight loop.
+ if (numSuccessiveFailures == lazyWriteReplicaTracker.numReplicasNotPersisted()) {
Thread.sleep(checkpointerInterval * 1000);
+ numSuccessiveFailures = 0;
}
} catch (InterruptedException e) {
LOG.info("LazyWriter was interrupted, exiting");
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4cf9afac/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
index d3c585d..85756b7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
@@ -255,22 +255,6 @@ public class FsVolumeImpl implements FsVolumeSpi {
getBlockPoolSlice(bpid).getVolumeMap(volumeMap);
}
- /**
- * Add replicas under the given directory to the volume map
- * @param volumeMap the replicas map
- * @param dir an input directory
- * @param isFinalized true if the directory has finalized replicas;
- * false if the directory has rbw replicas
- * @throws IOException
- */
- void addToReplicasMap(String bpid, ReplicaMap volumeMap,
- File dir, boolean isFinalized) throws IOException {
- BlockPoolSlice bp = getBlockPoolSlice(bpid);
- // TODO move this up
- // dfsUsage.incDfsUsed(b.getNumBytes()+metaFile.length());
- bp.addToReplicasMap(volumeMap, dir, isFinalized);
- }
-
@Override
public String toString() {
return currentDir.getAbsolutePath();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4cf9afac/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyWriteReplicaTracker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyWriteReplicaTracker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyWriteReplicaTracker.java
index ae28f09..222b63a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyWriteReplicaTracker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyWriteReplicaTracker.java
@@ -19,12 +19,11 @@
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
-import com.google.common.collect.Multimap;
import com.google.common.collect.TreeMultimap;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import java.io.File;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.*;
class LazyWriteReplicaTracker {
@@ -43,7 +42,7 @@ class LazyWriteReplicaTracker {
/**
* transient storage volume that holds the original replica.
*/
- final FsVolumeImpl transientVolume;
+ final FsVolumeSpi transientVolume;
/**
* Persistent volume that holds or will hold the saved replica.
@@ -51,7 +50,7 @@ class LazyWriteReplicaTracker {
FsVolumeImpl lazyPersistVolume;
File savedBlockFile;
- ReplicaState(final String bpid, final long blockId, FsVolumeImpl transientVolume) {
+ ReplicaState(final String bpid, final long blockId, FsVolumeSpi transientVolume) {
this.bpid = bpid;
this.blockId = blockId;
this.transientVolume = transientVolume;
@@ -61,6 +60,11 @@ class LazyWriteReplicaTracker {
}
@Override
+ public String toString() {
+ return "[Bpid=" + bpid + ";blockId=" + blockId + "]";
+ }
+
+ @Override
public int hashCode() {
return bpid.hashCode() ^ (int) blockId;
}
@@ -99,35 +103,43 @@ class LazyWriteReplicaTracker {
final Map<String, Map<Long, ReplicaState>> replicaMaps;
/**
+ * Queue of replicas that need to be written to disk.
+ */
+ final Queue<ReplicaState> replicasNotPersisted;
+
+ /**
* A map of blockId to persist complete time for transient blocks. This allows
* us to evict LRU blocks from transient storage. Protected by 'this'
* Object lock.
*/
- final Map<ReplicaState, Long> persistTimeMap;
+ final Map<ReplicaState, Long> replicasPersisted;
LazyWriteReplicaTracker(final FsDatasetImpl fsDataset) {
this.fsDataset = fsDataset;
replicaMaps = new HashMap<String, Map<Long, ReplicaState>>();
- persistTimeMap = new HashMap<ReplicaState, Long>();
+ replicasNotPersisted = new LinkedList<ReplicaState>();
+ replicasPersisted = new HashMap<ReplicaState, Long>();
}
TreeMultimap<Long, ReplicaState> getLruMap() {
// TODO: This can be made more efficient.
TreeMultimap<Long, ReplicaState> reversedMap = TreeMultimap.create();
- for (Map.Entry<ReplicaState, Long> entry : persistTimeMap.entrySet()) {
+ for (Map.Entry<ReplicaState, Long> entry : replicasPersisted.entrySet()) {
reversedMap.put(entry.getValue(), entry.getKey());
}
return reversedMap;
}
synchronized void addReplica(String bpid, long blockId,
- final FsVolumeImpl transientVolume) {
+ final FsVolumeSpi transientVolume) {
Map<Long, ReplicaState> map = replicaMaps.get(bpid);
if (map == null) {
map = new HashMap<Long, ReplicaState>();
replicaMaps.put(bpid, map);
}
- map.put(blockId, new ReplicaState(bpid, blockId, transientVolume));
+ ReplicaState replicaState = new ReplicaState(bpid, blockId, transientVolume);
+ map.put(blockId, replicaState);
+ replicasNotPersisted.add(replicaState);
}
synchronized void recordStartLazyPersist(
@@ -149,12 +161,49 @@ class LazyWriteReplicaTracker {
}
replicaState.state = State.LAZY_PERSIST_COMPLETE;
replicaState.savedBlockFile = savedBlockFile;
- persistTimeMap.put(replicaState, System.currentTimeMillis() / 1000);
+
+ if (replicasNotPersisted.peek() == replicaState) {
+ // Common case.
+ replicasNotPersisted.remove();
+ } else {
+ // Should never occur in practice as lazy writer always persists
+ // the replica at the head of the queue before moving to the next
+ // one.
+ replicasNotPersisted.remove(replicaState);
+ }
+ replicasPersisted.put(replicaState, System.currentTimeMillis() / 1000);
+ }
+
+ synchronized ReplicaState dequeueNextReplicaToPersist() {
+ while (replicasNotPersisted.size() != 0) {
+ ReplicaState replicaState = replicasNotPersisted.remove();
+ Map<Long, ReplicaState> replicaMap = replicaMaps.get(replicaState.bpid);
+
+ if (replicaMap != null && replicaMap.get(replicaState.blockId) != null) {
+ return replicaState;
+ }
+
+ // The replica no longer exists, look for the next one.
+ }
+ return null;
+ }
+
+ synchronized void reenqueueReplica(final ReplicaState replicaState) {
+ replicasNotPersisted.add(replicaState);
+ }
+
+ synchronized int numReplicasNotPersisted() {
+ return replicasNotPersisted.size();
}
synchronized void discardReplica(
final String bpid, final long blockId, boolean force) {
Map<Long, ReplicaState> map = replicaMaps.get(bpid);
+
+ if (map == null) {
+ return;
+ }
+
ReplicaState replicaState = map.get(blockId);
if (replicaState == null) {
@@ -172,6 +221,9 @@ class LazyWriteReplicaTracker {
}
map.remove(blockId);
- persistTimeMap.remove(replicaState);
+ replicasPersisted.remove(replicaState);
+
+ // Leave the replica in replicasNotPersisted if its present.
+ // dequeueNextReplicaToPersist will GC it eventually.
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4cf9afac/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java
index ddd71b1..af0e8ac 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java
@@ -40,7 +40,9 @@ import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.protocol.*;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
+import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.test.GenericTestUtils;
@@ -61,6 +63,7 @@ public class TestLazyPersistFiles {
static {
((Log4JLogger) NameNode.blockStateChangeLog).getLogger().setLevel(Level.ALL);
((Log4JLogger) NameNode.stateChangeLog).getLogger().setLevel(Level.ALL);
+ ((Log4JLogger) FsDatasetImpl.LOG).getLogger().setLevel(Level.ALL);
}
private static short REPL_FACTOR = 1;
@@ -68,7 +71,7 @@ public class TestLazyPersistFiles {
private static final int LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC = 3;
private static final long HEARTBEAT_INTERVAL_SEC = 1;
private static final int HEARTBEAT_RECHECK_INTERVAL_MSEC = 500;
- private static final int LAZY_WRITER_INTERVAL_SEC = 3;
+ private static final int LAZY_WRITER_INTERVAL_SEC = 1;
private static final int BUFFER_LENGTH = 4096;
private MiniDFSCluster cluster;
@@ -283,8 +286,9 @@ public class TestLazyPersistFiles {
File lazyPersistDir = volume.getBlockPoolSlice(bpid).getLazypersistDir();
for (LocatedBlock lb : locatedBlocks.getLocatedBlocks()) {
- File persistedBlockFile = new File(lazyPersistDir, "blk_" + lb.getBlock().getBlockId());
- if (persistedBlockFile.exists()) {
+ File targetDir = DatanodeUtil.idToBlockDir(lazyPersistDir, lb.getBlock().getBlockId());
+ File blockFile = new File(targetDir, lb.getBlock().getBlockName());
+ if (blockFile.exists()) {
// Found a persisted copy for this block!
boolean added = persistedBlockIds.add(lb.getBlock().getBlockId());
assertThat(added, is(true));
[2/2] git commit: HDFS-6931. Move lazily persisted replicas to
finalized directory on DN startup. (Arpit Agarwal)
Posted by ar...@apache.org.
HDFS-6931. Move lazily persisted replicas to finalized directory on DN startup. (Arpit Agarwal)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c92837ae
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c92837ae
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c92837ae
Branch: refs/heads/HDFS-6581
Commit: c92837aeab5188f6171d4016f91b3b4936a66beb
Parents: 4cf9afa
Author: arp <ar...@apache.org>
Authored: Thu Aug 28 23:13:46 2014 -0700
Committer: arp <ar...@apache.org>
Committed: Thu Aug 28 23:13:46 2014 -0700
----------------------------------------------------------------------
.../hadoop-hdfs/CHANGES-HDFS-6581.txt | 3 +
.../datanode/fsdataset/impl/BlockPoolSlice.java | 168 ++++++++++++++++---
.../datanode/fsdataset/impl/FsDatasetImpl.java | 9 +-
.../datanode/fsdataset/impl/FsVolumeImpl.java | 12 +-
.../datanode/fsdataset/impl/FsVolumeList.java | 18 +-
.../fsdataset/impl/TestLazyPersistFiles.java | 53 +++++-
6 files changed, 213 insertions(+), 50 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c92837ae/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-6581.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-6581.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-6581.txt
index 881cb63..8791485 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-6581.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-6581.txt
@@ -27,3 +27,6 @@
HDFS-6960. Bugfix in LazyWriter, fix test case and some refactoring.
(Arpit Agarwal)
+ HDFS-6931. Move lazily persisted replicas to finalized directory on DN
+ startup. (Arpit Agarwal)
+
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c92837ae/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
index 1313fef..1bb6680 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
@@ -94,17 +94,6 @@ class BlockPoolSlice {
}
}
- // Delete all checkpointed replicas on startup.
- // TODO: We can move checkpointed replicas to the finalized dir and delete
- // the copy on RAM_DISK. For now we take the simpler approach.
-
- FileUtil.fullyDelete(lazypersistDir);
- if (!this.lazypersistDir.exists()) {
- if (!this.lazypersistDir.mkdirs()) {
- throw new IOException("Failed to mkdirs " + this.lazypersistDir);
- }
- }
-
// Files that were being written when the datanode was last shutdown
// are now moved back to the data directory. It is possible that
// in the future, we might want to do some sort of datanode-local
@@ -271,6 +260,13 @@ class BlockPoolSlice {
return blockFile;
}
+ /**
+ * Save the given replica to persistent storage.
+ *
+ * @param replicaInfo
+ * @return The saved block file.
+ * @throws IOException
+ */
File lazyPersistReplica(ReplicaInfo replicaInfo) throws IOException {
if (!lazypersistDir.exists() && !lazypersistDir.mkdirs()) {
FsDatasetImpl.LOG.warn("Failed to create " + lazypersistDir);
@@ -305,11 +301,21 @@ class BlockPoolSlice {
- void getVolumeMap(ReplicaMap volumeMap) throws IOException {
+ void getVolumeMap(ReplicaMap volumeMap,
+ final LazyWriteReplicaTracker lazyWriteReplicaMap)
+ throws IOException {
+ // Recover lazy persist replicas, they will be added to the volumeMap
+ // when we scan the finalized directory.
+ if (lazypersistDir.exists()) {
+ int numRecovered = moveLazyPersistReplicasToFinalized(lazypersistDir);
+ FsDatasetImpl.LOG.info(
+ "Recovered " + numRecovered + " replicas from " + lazypersistDir);
+ }
+
// add finalized replicas
- addToReplicasMap(volumeMap, finalizedDir, true);
+ addToReplicasMap(volumeMap, finalizedDir, lazyWriteReplicaMap, true);
// add rbw replicas
- addToReplicasMap(volumeMap, rbwDir, false);
+ addToReplicasMap(volumeMap, rbwDir, lazyWriteReplicaMap, false);
}
/**
@@ -338,18 +344,68 @@ class BlockPoolSlice {
/**
+ * Move replicas in the lazy persist directory to their corresponding locations
+ * in the finalized directory.
+ * @return number of replicas recovered.
+ */
+ private int moveLazyPersistReplicasToFinalized(File source)
+ throws IOException {
+ File files[] = FileUtil.listFiles(source);
+ int numRecovered = 0;
+ for (File file : files) {
+ if (file.isDirectory()) {
+ numRecovered += moveLazyPersistReplicasToFinalized(file);
+ }
+
+ if (Block.isMetaFilename(file.getName())) {
+ File metaFile = file;
+ File blockFile = Block.metaToBlockFile(metaFile);
+ long blockId = Block.filename2id(blockFile.getName());
+ File targetDir = DatanodeUtil.idToBlockDir(finalizedDir, blockId);
+
+ if (blockFile.exists()) {
+ File targetBlockFile = new File(targetDir, blockFile.getName());
+ File targetMetaFile = new File(targetDir, metaFile.getName());
+
+ if (!targetDir.exists() && !targetDir.mkdirs()) {
+ FsDatasetImpl.LOG.warn("Failed to move " + blockFile + " to " + targetDir);
+ continue;
+ }
+
+ metaFile.renameTo(targetMetaFile);
+ blockFile.renameTo(targetBlockFile);
+
+ if (targetBlockFile.exists() && targetMetaFile.exists()) {
+ ++numRecovered;
+ } else {
+ // Failure should be rare.
+ FsDatasetImpl.LOG.warn("Failed to move " + blockFile + " to " + targetDir);
+ }
+ }
+ }
+ }
+
+ FileUtil.fullyDelete(source);
+ return numRecovered;
+ }
+
+ /**
* Add replicas under the given directory to the volume map
* @param volumeMap the replicas map
* @param dir an input directory
+ * @param lazyWriteReplicaMap Map of replicas on transient
+ * storage.
* @param isFinalized true if the directory has finalized replicas;
* false if the directory has rbw replicas
*/
- void addToReplicasMap(ReplicaMap volumeMap, File dir, boolean isFinalized
- ) throws IOException {
+ void addToReplicasMap(ReplicaMap volumeMap, File dir,
+ final LazyWriteReplicaTracker lazyWriteReplicaMap,
+ boolean isFinalized)
+ throws IOException {
File files[] = FileUtil.listFiles(dir);
for (File file : files) {
if (file.isDirectory()) {
- addToReplicasMap(volumeMap, file, isFinalized);
+ addToReplicasMap(volumeMap, file, lazyWriteReplicaMap, isFinalized);
}
if (isFinalized && FsDatasetUtil.isUnlinkTmpFile(file)) {
@@ -405,13 +461,83 @@ class BlockPoolSlice {
}
}
- ReplicaInfo oldReplica = volumeMap.add(bpid, newReplica);
- if (oldReplica != null) {
- FsDatasetImpl.LOG.warn("Two block files with the same block id exist " +
- "on disk: " + oldReplica.getBlockFile() + " and " + file );
+ ReplicaInfo oldReplica = volumeMap.get(bpid, newReplica.getBlockId());
+ if (oldReplica == null) {
+ volumeMap.add(bpid, newReplica);
+ } else {
+ // We have multiple replicas of the same block so decide which one
+ // to keep.
+ newReplica = resolveDuplicateReplicas(newReplica, oldReplica, volumeMap);
+ }
+
+ // If we are retaining a replica on transient storage make sure
+ // it is in the lazyWriteReplicaMap so it can be persisted
+ // eventually.
+ if (newReplica.getVolume().isTransientStorage()) {
+ lazyWriteReplicaMap.addReplica(bpid, blockId, newReplica.getVolume());
+ } else {
+ lazyWriteReplicaMap.discardReplica(bpid, blockId, true);
}
}
}
+
+ /**
+ * This method is invoked during DN startup when volumes are scanned to
+ * build up the volumeMap.
+ *
+ * Given two replicas, decide which one to keep. The preference is as
+ * follows:
+ * 1. Prefer the replica with the higher generation stamp.
+ * 2. If generation stamps are equal, prefer the replica with the
+ * larger on-disk length.
+ * 3. If on-disk length is the same, prefer the replica on persistent
+ * storage volume.
+ * 4. All other factors being equal, keep replica1.
+ *
+ * The other replica is removed from the volumeMap and is deleted from
+ * its storage volume.
+ *
+ * @param replica1
+ * @param replica2
+ * @param volumeMap
+ * @return the replica that is retained.
+ * @throws IOException
+ */
+ private ReplicaInfo resolveDuplicateReplicas(
+ final ReplicaInfo replica1, final ReplicaInfo replica2,
+ final ReplicaMap volumeMap) throws IOException {
+
+ ReplicaInfo replicaToKeep;
+ ReplicaInfo replicaToDelete;
+
+ if (replica1.getGenerationStamp() != replica2.getGenerationStamp()) {
+ replicaToKeep = replica1.getGenerationStamp() > replica2.getGenerationStamp()
+ ? replica1 : replica2;
+ } else if (replica1.getNumBytes() != replica2.getNumBytes()) {
+ replicaToKeep = replica1.getNumBytes() > replica2.getNumBytes() ?
+ replica1 : replica2;
+ } else if (replica1.getVolume().isTransientStorage() &&
+ !replica2.getVolume().isTransientStorage()) {
+ replicaToKeep = replica2;
+ } else {
+ replicaToKeep = replica1;
+ }
+
+ replicaToDelete = (replicaToKeep == replica1) ? replica2 : replica1;
+
+ // Update volumeMap.
+ volumeMap.add(bpid, replicaToKeep);
+
+ // Delete the files on disk. Failure here is okay.
+ replicaToDelete.getBlockFile().delete();
+ replicaToDelete.getMetaFile().delete();
+
+ FsDatasetImpl.LOG.info(
+ "resolveDuplicateReplicas keeping " + replicaToKeep.getBlockFile() +
+ ", deleting " + replicaToDelete.getBlockFile());
+
+ return replicaToKeep;
+ }
/**
* Find out the number of bytes in the block that match its crc.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c92837ae/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index 22f626c..10d98ad 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -262,13 +262,13 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
volumes = new FsVolumeList(volsFailed, blockChooserImpl);
asyncDiskService = new FsDatasetAsyncDiskService(datanode);
- // TODO: Initialize transientReplicaTracker from blocks on disk.
-
for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) {
addVolume(dataLocations, storage.getStorageDir(idx));
}
cacheManager = new FsDatasetCache(this);
+
+ // Start the lazy writer once we have built the replica maps.
lazyWriter = new Daemon(new LazyWriter(
conf.getInt(DFSConfigKeys.DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC,
DFSConfigKeys.DFS_DATANODE_LAZY_WRITER_INTERVAL_DEFAULT_SEC)));
@@ -287,7 +287,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
// storageMap and asyncDiskService, consistent.
FsVolumeImpl fsVolume = FsVolumeImplAllocator.createVolume(
this, sd.getStorageUuid(), dir, this.conf, storageType);
- fsVolume.getVolumeMap(volumeMap);
+ fsVolume.getVolumeMap(volumeMap, lazyWriteReplicaTracker);
volumes.addVolume(fsVolume);
storageMap.put(sd.getStorageUuid(),
@@ -2021,7 +2021,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
volumes.addBlockPool(bpid, conf);
volumeMap.initBlockPool(bpid);
}
- volumes.getAllVolumesMap(bpid, volumeMap);
+ volumes.getAllVolumesMap(bpid, volumeMap, lazyWriteReplicaTracker);
}
@Override
@@ -2261,6 +2261,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
LOG.warn("Exception saving replica " + replicaState, ioe);
} finally {
if (!succeeded && replicaState != null) {
+ LOG.warn("Failed to save replica " + replicaState + ". re-enqueueing it.");
lazyWriteReplicaTracker.reenqueueReplica(replicaState);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c92837ae/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
index 85756b7..ccfb449 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
@@ -245,14 +245,18 @@ public class FsVolumeImpl implements FsVolumeSpi {
}
}
- void getVolumeMap(ReplicaMap volumeMap) throws IOException {
+ void getVolumeMap(ReplicaMap volumeMap,
+ final LazyWriteReplicaTracker lazyWriteReplicaMap)
+ throws IOException {
for(BlockPoolSlice s : bpSlices.values()) {
- s.getVolumeMap(volumeMap);
+ s.getVolumeMap(volumeMap, lazyWriteReplicaMap);
}
}
- void getVolumeMap(String bpid, ReplicaMap volumeMap) throws IOException {
- getBlockPoolSlice(bpid).getVolumeMap(volumeMap);
+ void getVolumeMap(String bpid, ReplicaMap volumeMap,
+ final LazyWriteReplicaTracker lazyWriteReplicaMap)
+ throws IOException {
+ getBlockPoolSlice(bpid).getVolumeMap(volumeMap, lazyWriteReplicaMap);
}
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c92837ae/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
index f1b196a..67958bf 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
@@ -119,7 +119,10 @@ class FsVolumeList {
return remaining;
}
- void getAllVolumesMap(final String bpid, final ReplicaMap volumeMap) throws IOException {
+ void getAllVolumesMap(final String bpid,
+ final ReplicaMap volumeMap,
+ final LazyWriteReplicaTracker lazyWriteReplicaMap)
+ throws IOException {
long totalStartTime = Time.monotonicNow();
final List<IOException> exceptions = Collections.synchronizedList(
new ArrayList<IOException>());
@@ -131,7 +134,7 @@ class FsVolumeList {
FsDatasetImpl.LOG.info("Adding replicas to map for block pool " +
bpid + " on volume " + v + "...");
long startTime = Time.monotonicNow();
- v.getVolumeMap(bpid, volumeMap);
+ v.getVolumeMap(bpid, volumeMap, lazyWriteReplicaMap);
long timeTaken = Time.monotonicNow() - startTime;
FsDatasetImpl.LOG.info("Time to add replicas to map for block pool"
+ " " + bpid + " on volume " + v + ": " + timeTaken + "ms");
@@ -160,17 +163,6 @@ class FsVolumeList {
+ totalTimeTaken + "ms");
}
- void getVolumeMap(String bpid, FsVolumeImpl volume, ReplicaMap volumeMap)
- throws IOException {
- FsDatasetImpl.LOG.info("Adding replicas to map for block pool " + bpid +
- " on volume " + volume + "...");
- long startTime = Time.monotonicNow();
- volume.getVolumeMap(bpid, volumeMap);
- long timeTaken = Time.monotonicNow() - startTime;
- FsDatasetImpl.LOG.info("Time to add replicas to map for block pool " + bpid +
- " on volume " + volume + ": " + timeTaken + "ms");
- }
-
/**
* Calls {@link FsVolumeImpl#checkDirs()} on each volume, removing any
* volumes from the active list that result in a DiskErrorException.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c92837ae/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java
index af0e8ac..cac99a7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java
@@ -265,7 +265,9 @@ public class TestLazyPersistFiles {
LocatedBlocks locatedBlocks = ensureFileReplicasOnStorageType(path, RAM_DISK);
// Sleep for a short time to allow the lazy writer thread to do its job
- Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);
+ Thread.sleep(6 * LAZY_WRITER_INTERVAL_SEC * 1000);
+
+ LOG.info("Verifying copy was saved to lazyPersist/");
// Make sure that there is a saved copy of the replica on persistent
// storage.
@@ -330,23 +332,52 @@ public class TestLazyPersistFiles {
ensureFileReplicasOnStorageType(path1, DEFAULT);
}
- /**
- * TODO: Stub test, to be completed.
- * Verify that checksum computation is skipped for files written to memory.
- */
@Test (timeout=300000)
- public void testChecksumIsSkipped()
+ public void testDnRestartWithSavedReplicas()
throws IOException, InterruptedException {
+
startUpCluster(REPL_FACTOR,
- new StorageType[] {RAM_DISK, DEFAULT }, -1);
+ new StorageType[] {RAM_DISK, DEFAULT },
+ (2 * BLOCK_SIZE - 1)); // 1 replica + delta.
final String METHOD_NAME = GenericTestUtils.getMethodName();
Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
makeTestFile(path1, BLOCK_SIZE, true);
ensureFileReplicasOnStorageType(path1, RAM_DISK);
- // Verify checksum was not computed.
+ // Sleep for a short time to allow the lazy writer thread to do its job.
+ // However the block replica should not be evicted from RAM_DISK yet.
+ Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);
+ ensureFileReplicasOnStorageType(path1, RAM_DISK);
+ LOG.info("Restarting the DataNode");
+ cluster.restartDataNode(0, true);
+ cluster.waitActive();
+
+ // Ensure that the replica is now on persistent storage.
+ ensureFileReplicasOnStorageType(path1, DEFAULT);
+ }
+
+ @Test (timeout=300000)
+ public void testDnRestartWithUnsavedReplicas()
+ throws IOException, InterruptedException {
+
+ startUpCluster(REPL_FACTOR,
+ new StorageType[] {RAM_DISK, DEFAULT },
+ (2 * BLOCK_SIZE - 1)); // 1 replica + delta.
+ stopLazyWriter(cluster.getDataNodes().get(0));
+
+ final String METHOD_NAME = GenericTestUtils.getMethodName();
+ Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
+ makeTestFile(path1, BLOCK_SIZE, true);
+ ensureFileReplicasOnStorageType(path1, RAM_DISK);
+
+ LOG.info("Restarting the DataNode");
+ cluster.restartDataNode(0, true);
+ cluster.waitActive();
+
+ // Ensure that the replica is still on transient storage.
+ ensureFileReplicasOnStorageType(path1, RAM_DISK);
}
// ---- Utility functions for all test cases -------------------------------
@@ -443,4 +474,10 @@ public class TestLazyPersistFiles {
return locatedBlocks;
}
+
+ private void stopLazyWriter(DataNode dn) {
+ // Stop the lazyWriter daemon.
+ FsDatasetImpl fsDataset = ((FsDatasetImpl) dn.getFSDataset());
+ ((FsDatasetImpl.LazyWriter) fsDataset.lazyWriter.getRunnable()).stop();
+ }
}