You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2014/10/17 23:45:00 UTC
[13/34] git commit: HDFS-6991. Notify NN of evicted block before
deleting it from RAM disk. (Arpit Agarwal)
HDFS-6991. Notify NN of evicted block before deleting it from RAM disk. (Arpit Agarwal)
Conflicts:
hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-6581.txt
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b8a2eb79
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b8a2eb79
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b8a2eb79
Branch: refs/heads/branch-2
Commit: b8a2eb793a4cac7a7961204df9d4ca4600cdbcf0
Parents: 3abf34a
Author: arp <ar...@apache.org>
Authored: Mon Sep 8 14:29:30 2014 -0700
Committer: Jitendra Pandey <Ji...@Jitendra-Pandeys-MacBook-Pro-4.local>
Committed: Fri Oct 17 13:42:01 2014 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/hdfs/DFSConfigKeys.java | 4 +
.../server/blockmanagement/BlockManager.java | 4 +-
.../hdfs/server/datanode/BPOfferService.java | 6 -
.../hadoop/hdfs/server/datanode/DataNode.java | 2 +-
.../datanode/fsdataset/impl/FsDatasetImpl.java | 101 +++++++-----
.../org/apache/hadoop/hdfs/MiniDFSCluster.java | 3 +-
.../fsdataset/impl/TestLazyPersistFiles.java | 157 ++++++++-----------
7 files changed, 130 insertions(+), 147 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b8a2eb79/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index e12b01a..f6c0180 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -129,6 +129,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final int DFS_DATANODE_FSDATASETCACHE_MAX_THREADS_PER_VOLUME_DEFAULT = 4;
public static final String DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC = "dfs.datanode.lazywriter.interval.sec";
public static final int DFS_DATANODE_LAZY_WRITER_INTERVAL_DEFAULT_SEC = 60;
+ public static final String DFS_DATANODE_RAM_DISK_LOW_WATERMARK_PERCENT = "dfs.datanode.ram.disk.low.watermark.percent";
+ public static final int DFS_DATANODE_RAM_DISK_LOW_WATERMARK_PERCENT_DEFAULT = 10;
+ public static final String DFS_DATANODE_RAM_DISK_LOW_WATERMARK_REPLICAS = "dfs.datanode.ram.disk.low.watermark.replicas";
+ public static final int DFS_DATANODE_RAM_DISK_LOW_WATERMARK_REPLICAS_DEFAULT = 3;
public static final String DFS_NAMENODE_PATH_BASED_CACHE_BLOCK_MAP_ALLOCATION_PERCENT =
"dfs.namenode.path.based.cache.block.map.allocation.percent";
public static final float DFS_NAMENODE_PATH_BASED_CACHE_BLOCK_MAP_ALLOCATION_PERCENT_DEFAULT = 0.25f;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b8a2eb79/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index c4dfa68..878002d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -2119,8 +2119,8 @@ public class BlockManager {
// Add replica if appropriate. If the replica was previously corrupt
// but now okay, it might need to be updated.
if (reportedState == ReplicaState.FINALIZED
- && (!storedBlock.findDatanode(dn)
- || corruptReplicas.isReplicaCorrupt(storedBlock, dn))) {
+ && (storedBlock.findStorageInfo(storageInfo) == -1 ||
+ corruptReplicas.isReplicaCorrupt(storedBlock, dn))) {
toAdd.add(storedBlock);
}
return storedBlock;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b8a2eb79/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
index 2eab87f..1455a8a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
@@ -230,7 +230,6 @@ class BPOfferService {
void notifyNamenodeReceivedBlock(
ExtendedBlock block, String delHint, String storageUuid) {
checkBlock(block);
- checkDelHint(delHint);
ReceivedDeletedBlockInfo bInfo = new ReceivedDeletedBlockInfo(
block.getLocalBlock(),
ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK,
@@ -249,11 +248,6 @@ class BPOfferService {
block.getBlockPoolId(), getBlockPoolId());
}
- private void checkDelHint(String delHint) {
- Preconditions.checkArgument(delHint != null,
- "delHint is null");
- }
-
void notifyNamenodeDeletedBlock(ExtendedBlock block, String storageUuid) {
checkBlock(block);
ReceivedDeletedBlockInfo bInfo = new ReceivedDeletedBlockInfo(
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b8a2eb79/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index a4db56a..bf09899 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -897,7 +897,7 @@ public class DataNode extends ReconfigurableBase
}
// calls specific to BP
- protected void notifyNamenodeReceivedBlock(
+ public void notifyNamenodeReceivedBlock(
ExtendedBlock block, String delHint, String storageUuid) {
BPOfferService bpos = blockPoolManager.get(block.getBlockPoolId());
if(bpos != null) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b8a2eb79/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index 19260db..2b16a65 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -47,7 +47,6 @@ import javax.management.StandardMBean;
import com.google.common.collect.Lists;
import com.google.common.base.Preconditions;
-import com.google.common.collect.TreeMultimap;
import org.apache.commons.io.FileUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -282,9 +281,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
cacheManager = new FsDatasetCache(this);
// Start the lazy writer once we have built the replica maps.
- lazyWriter = new Daemon(new LazyWriter(
- conf.getInt(DFSConfigKeys.DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC,
- DFSConfigKeys.DFS_DATANODE_LAZY_WRITER_INTERVAL_DEFAULT_SEC)));
+ lazyWriter = new Daemon(new LazyWriter(conf));
lazyWriter.start();
registerMBean(datanode.getDatanodeUuid());
}
@@ -2294,16 +2291,23 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
private volatile boolean shouldRun = true;
final int checkpointerInterval;
final long estimateBlockSize;
+ final int lowWatermarkFreeSpacePercentage;
+ final int lowWatermarkFreeSpaceReplicas;
- public static final int LOW_WATERMARK_FREE_SPACE_PERCENT = 10;
- public static final int LOW_WATERMARK_FREE_SPACE_REPLICAS = 3;
-
- public LazyWriter(final int checkpointerInterval) {
- this.checkpointerInterval = checkpointerInterval;
+ public LazyWriter(Configuration conf) {
+ this.checkpointerInterval = conf.getInt(
+ DFSConfigKeys.DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC,
+ DFSConfigKeys.DFS_DATANODE_LAZY_WRITER_INTERVAL_DEFAULT_SEC);
this.estimateBlockSize = conf.getLongBytes(
DFSConfigKeys.DFS_BLOCK_SIZE_KEY,
DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT);
+ this.lowWatermarkFreeSpacePercentage = conf.getInt(
+ DFSConfigKeys.DFS_DATANODE_RAM_DISK_LOW_WATERMARK_PERCENT,
+ DFSConfigKeys.DFS_DATANODE_RAM_DISK_LOW_WATERMARK_PERCENT_DEFAULT);
+ this.lowWatermarkFreeSpaceReplicas = conf.getInt(
+ DFSConfigKeys.DFS_DATANODE_RAM_DISK_LOW_WATERMARK_REPLICAS,
+ DFSConfigKeys.DFS_DATANODE_RAM_DISK_LOW_WATERMARK_REPLICAS_DEFAULT);
}
private void moveReplicaToNewVolume(String bpid, long blockId)
@@ -2390,49 +2394,63 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
}
int percentFree = (int) (free * 100 / capacity);
- return percentFree < LOW_WATERMARK_FREE_SPACE_PERCENT ||
- free < (estimateBlockSize * LOW_WATERMARK_FREE_SPACE_REPLICAS);
+ return percentFree < lowWatermarkFreeSpacePercentage ||
+ free < (estimateBlockSize * lowWatermarkFreeSpaceReplicas);
}
/**
* Attempt to evict one or more transient block replicas we have at least
* spaceNeeded bytes free.
*/
- private synchronized void evictBlocks() throws IOException {
+ private void evictBlocks() throws IOException {
int iterations = 0;
- LazyWriteReplicaTracker.ReplicaState replicaState =
- lazyWriteReplicaTracker.getNextCandidateForEviction();
-
- while (replicaState != null &&
- iterations++ < MAX_BLOCK_EVICTIONS_PER_ITERATION &
+ while (iterations++ < MAX_BLOCK_EVICTIONS_PER_ITERATION &&
transientFreeSpaceBelowThreshold()) {
+ LazyWriteReplicaTracker.ReplicaState replicaState =
+ lazyWriteReplicaTracker.getNextCandidateForEviction();
+
if (LOG.isDebugEnabled()) {
- LOG.info("Evicting block " + replicaState);
+ LOG.debug("Evicting block " + replicaState);
}
- ReplicaInfo replicaInfo = getReplicaInfo(replicaState.bpid, replicaState.blockId);
- Preconditions.checkState(replicaInfo.getVolume().isTransientStorage());
- File blockFile = replicaInfo.getBlockFile();
- File metaFile = replicaInfo.getMetaFile();
- long blockFileUsed = blockFile.length();
- long metaFileUsed = metaFile.length();
- lazyWriteReplicaTracker.discardReplica(replicaState, false);
-
- // Move the replica from lazyPersist/ to finalized/ on target volume
- BlockPoolSlice bpSlice =
- replicaState.lazyPersistVolume.getBlockPoolSlice(replicaState.bpid);
- File newBlockFile = bpSlice.activateSavedReplica(
- replicaInfo, replicaState.savedBlockFile);
-
- ReplicaInfo newReplicaInfo =
- new FinalizedReplica(replicaInfo.getBlockId(),
- replicaInfo.getBytesOnDisk(),
- replicaInfo.getGenerationStamp(),
- replicaState.lazyPersistVolume,
- newBlockFile.getParentFile());
-
- // Update the volumeMap entry. This removes the old entry.
- volumeMap.add(replicaState.bpid, newReplicaInfo);
+
+ ReplicaInfo replicaInfo, newReplicaInfo;
+ File blockFile, metaFile;
+ long blockFileUsed, metaFileUsed;
+
+ synchronized (FsDatasetImpl.this) {
+ replicaInfo = getReplicaInfo(replicaState.bpid, replicaState.blockId);
+ Preconditions.checkState(replicaInfo.getVolume().isTransientStorage());
+ blockFile = replicaInfo.getBlockFile();
+ metaFile = replicaInfo.getMetaFile();
+ blockFileUsed = blockFile.length();
+ metaFileUsed = metaFile.length();
+ lazyWriteReplicaTracker.discardReplica(replicaState, false);
+
+ // Move the replica from lazyPersist/ to finalized/ on target volume
+ BlockPoolSlice bpSlice =
+ replicaState.lazyPersistVolume.getBlockPoolSlice(replicaState.bpid);
+ File newBlockFile = bpSlice.activateSavedReplica(
+ replicaInfo, replicaState.savedBlockFile);
+
+ newReplicaInfo =
+ new FinalizedReplica(replicaInfo.getBlockId(),
+ replicaInfo.getBytesOnDisk(),
+ replicaInfo.getGenerationStamp(),
+ replicaState.lazyPersistVolume,
+ newBlockFile.getParentFile());
+
+ // Update the volumeMap entry.
+ volumeMap.add(replicaState.bpid, newReplicaInfo);
+ }
+
+ // Before deleting the files from transient storage we must notify the
+ // NN that the files are on the new storage. Else a blockReport from
+ // the transient storage might cause the NN to think the blocks are lost.
+ ExtendedBlock extendedBlock =
+ new ExtendedBlock(replicaState.bpid, newReplicaInfo);
+ datanode.notifyNamenodeReceivedBlock(
+ extendedBlock, null, newReplicaInfo.getStorageUuid());
// Remove the old replicas from transient storage.
if (blockFile.delete() || !blockFile.exists()) {
@@ -2444,7 +2462,6 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
// If deletion failed then the directory scanner will cleanup the blocks
// eventually.
- replicaState = lazyWriteReplicaTracker.getNextCandidateForEviction();
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b8a2eb79/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
index 338744d..7034715 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
@@ -1343,6 +1343,7 @@ public class MiniDFSCluster {
}
int curDatanodesNum = dataNodes.size();
+ final int curDatanodesNumSaved = curDatanodesNum;
// for mincluster's the default initialDelay for BRs is 0
if (conf.get(DFS_BLOCKREPORT_INITIAL_DELAY_KEY) == null) {
conf.setLong(DFS_BLOCKREPORT_INITIAL_DELAY_KEY, 0);
@@ -1481,7 +1482,7 @@ public class MiniDFSCluster {
waitActive();
if (storageCapacities != null) {
- for (int i = curDatanodesNum; i < curDatanodesNum+numDataNodes; ++i) {
+ for (int i = curDatanodesNumSaved; i < curDatanodesNumSaved+numDataNodes; ++i) {
final int index = i - curDatanodesNum;
List<? extends FsVolumeSpi> volumes = dns[index].getFSDataset().getVolumes();
assert storageCapacities[index].length == storagesPerDatanode;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b8a2eb79/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java
index 7dfba50..fcc4798 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java
@@ -23,7 +23,6 @@ import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CreateFlag;
-import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
@@ -71,13 +70,14 @@ public class TestLazyPersistFiles {
private static final int THREADPOOL_SIZE = 10;
- private static short REPL_FACTOR = 1;
+ private static final short REPL_FACTOR = 1;
private static final int BLOCK_SIZE = 10485760; // 10 MB
private static final int LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC = 3;
private static final long HEARTBEAT_INTERVAL_SEC = 1;
private static final int HEARTBEAT_RECHECK_INTERVAL_MSEC = 500;
private static final int LAZY_WRITER_INTERVAL_SEC = 1;
private static final int BUFFER_LENGTH = 4096;
+ private static final int EVICTION_LOW_WATERMARK = 1;
private MiniDFSCluster cluster;
private DistributedFileSystem fs;
@@ -101,7 +101,7 @@ public class TestLazyPersistFiles {
@Test (timeout=300000)
public void testFlagNotSetByDefault() throws IOException {
- startUpCluster(REPL_FACTOR, null, -1);
+ startUpCluster(false, -1);
final String METHOD_NAME = GenericTestUtils.getMethodName();
Path path = new Path("/" + METHOD_NAME + ".dat");
@@ -113,7 +113,7 @@ public class TestLazyPersistFiles {
@Test (timeout=300000)
public void testFlagPropagation() throws IOException {
- startUpCluster(REPL_FACTOR, null, -1);
+ startUpCluster(false, -1);
final String METHOD_NAME = GenericTestUtils.getMethodName();
Path path = new Path("/" + METHOD_NAME + ".dat");
@@ -125,7 +125,7 @@ public class TestLazyPersistFiles {
@Test (timeout=300000)
public void testFlagPersistenceInEditLog() throws IOException {
- startUpCluster(REPL_FACTOR, null, -1);
+ startUpCluster(false, -1);
final String METHOD_NAME = GenericTestUtils.getMethodName();
Path path = new Path("/" + METHOD_NAME + ".dat");
@@ -139,10 +139,9 @@ public class TestLazyPersistFiles {
@Test (timeout=300000)
public void testFlagPersistenceInFsImage() throws IOException {
- startUpCluster(REPL_FACTOR, null, -1);
+ startUpCluster(false, -1);
final String METHOD_NAME = GenericTestUtils.getMethodName();
Path path = new Path("/" + METHOD_NAME + ".dat");
- FSDataOutputStream fos = null;
makeTestFile(path, 0, true);
// checkpoint
@@ -158,7 +157,7 @@ public class TestLazyPersistFiles {
@Test (timeout=300000)
public void testPlacementOnRamDisk() throws IOException {
- startUpCluster(REPL_FACTOR, new StorageType[] { DEFAULT, RAM_DISK}, -1);
+ startUpCluster(true, -1);
final String METHOD_NAME = GenericTestUtils.getMethodName();
Path path = new Path("/" + METHOD_NAME + ".dat");
@@ -168,8 +167,7 @@ public class TestLazyPersistFiles {
@Test (timeout=300000)
public void testPlacementOnSizeLimitedRamDisk() throws IOException {
- startUpCluster(REPL_FACTOR, new StorageType[] { DEFAULT, RAM_DISK },
- 3 * BLOCK_SIZE -1); // 2 replicas + delta
+ startUpCluster(true, 3);
final String METHOD_NAME = GenericTestUtils.getMethodName();
Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
Path path2 = new Path("/" + METHOD_NAME + ".02.dat");
@@ -188,7 +186,7 @@ public class TestLazyPersistFiles {
*/
@Test (timeout=300000)
public void testFallbackToDisk() throws IOException {
- startUpCluster(REPL_FACTOR, null, -1);
+ startUpCluster(false, -1);
final String METHOD_NAME = GenericTestUtils.getMethodName();
Path path = new Path("/" + METHOD_NAME + ".dat");
@@ -202,7 +200,7 @@ public class TestLazyPersistFiles {
*/
@Test (timeout=300000)
public void testFallbackToDiskFull() throws IOException {
- startUpCluster(REPL_FACTOR, null, BLOCK_SIZE - 1);
+ startUpCluster(false, 0);
final String METHOD_NAME = GenericTestUtils.getMethodName();
Path path = new Path("/" + METHOD_NAME + ".dat");
@@ -213,15 +211,13 @@ public class TestLazyPersistFiles {
/**
* File partially fit in RamDisk after eviction.
* RamDisk can fit 2 blocks. Write a file with 5 blocks.
- * Expect 2 blocks are on RamDisk whereas other 3 on disk.
+ * Expect 2 or less blocks are on RamDisk and 3 or more on disk.
* @throws IOException
*/
@Test (timeout=300000)
public void testFallbackToDiskPartial()
throws IOException, InterruptedException {
- startUpCluster(REPL_FACTOR,
- new StorageType[] { RAM_DISK, DEFAULT },
- BLOCK_SIZE * 3 - 1);
+ startUpCluster(true, 2);
final String METHOD_NAME = GenericTestUtils.getMethodName();
Path path = new Path("/" + METHOD_NAME + ".dat");
@@ -241,12 +237,15 @@ public class TestLazyPersistFiles {
for (LocatedBlock locatedBlock : locatedBlocks.getLocatedBlocks()) {
if (locatedBlock.getStorageTypes()[0] == RAM_DISK) {
numBlocksOnRamDisk++;
- }else if (locatedBlock.getStorageTypes()[0] == DEFAULT) {
+ } else if (locatedBlock.getStorageTypes()[0] == DEFAULT) {
numBlocksOnDisk++;
}
}
- assertThat(numBlocksOnRamDisk, is(2));
- assertThat(numBlocksOnDisk, is(3));
+
+ // Since eviction is asynchronous, depending on the timing of eviction
+ // wrt writes, we may get 2 or less blocks on RAM disk.
+ assert(numBlocksOnRamDisk <= 2);
+ assert(numBlocksOnDisk >= 3);
}
/**
@@ -257,7 +256,7 @@ public class TestLazyPersistFiles {
*/
@Test (timeout=300000)
public void testRamDiskNotChosenByDefault() throws IOException {
- startUpCluster(REPL_FACTOR, new StorageType[] {RAM_DISK, RAM_DISK}, -1);
+ startUpCluster(true, -1);
final String METHOD_NAME = GenericTestUtils.getMethodName();
Path path = new Path("/" + METHOD_NAME + ".dat");
@@ -275,7 +274,7 @@ public class TestLazyPersistFiles {
*/
@Test (timeout=300000)
public void testAppendIsDenied() throws IOException {
- startUpCluster(REPL_FACTOR, new StorageType[] {RAM_DISK, DEFAULT }, -1);
+ startUpCluster(true, -1);
final String METHOD_NAME = GenericTestUtils.getMethodName();
Path path = new Path("/" + METHOD_NAME + ".dat");
@@ -297,17 +296,12 @@ public class TestLazyPersistFiles {
@Test (timeout=300000)
public void testLazyPersistFilesAreDiscarded()
throws IOException, InterruptedException {
- startUpCluster(REPL_FACTOR,
- new StorageType[] { RAM_DISK, DEFAULT },
- (2 * BLOCK_SIZE - 1)); // 1 replica + delta.
+ startUpCluster(true, 2);
final String METHOD_NAME = GenericTestUtils.getMethodName();
Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
- Path path2 = new Path("/" + METHOD_NAME + ".02.dat");
makeTestFile(path1, BLOCK_SIZE, true);
- makeTestFile(path2, BLOCK_SIZE, false);
ensureFileReplicasOnStorageType(path1, RAM_DISK);
- ensureFileReplicasOnStorageType(path2, DEFAULT);
// Stop the DataNode and sleep for the time it takes the NN to
// detect the DN as being dead.
@@ -315,30 +309,28 @@ public class TestLazyPersistFiles {
Thread.sleep(30000L);
assertThat(cluster.getNamesystem().getNumDeadDataNodes(), is(1));
- // Next, wait for the replication monitor to mark the file as
- // corrupt, plus some delta.
+ // Next, wait for the replication monitor to mark the file as corrupt
Thread.sleep(2 * DFS_NAMENODE_REPLICATION_INTERVAL_DEFAULT * 1000);
- // Wait for the LazyPersistFileScrubber to run, plus some delta.
+ // Wait for the LazyPersistFileScrubber to run
Thread.sleep(2 * LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC * 1000);
// Ensure that path1 does not exist anymore, whereas path2 does.
assert(!fs.exists(path1));
- assert(fs.exists(path2));
- // We should have only one block that needs replication i.e. the one
+ // We should have zero blocks that needs replication i.e. the one
// belonging to path2.
assertThat(cluster.getNameNode()
.getNamesystem()
.getBlockManager()
.getUnderReplicatedBlocksCount(),
- is(1L));
+ is(0L));
}
@Test (timeout=300000)
public void testLazyPersistBlocksAreSaved()
throws IOException, InterruptedException {
- startUpCluster(REPL_FACTOR, new StorageType[] { RAM_DISK, DEFAULT }, -1);
+ startUpCluster(true, -1);
final String METHOD_NAME = GenericTestUtils.getMethodName();
Path path = new Path("/" + METHOD_NAME + ".dat");
@@ -386,16 +378,12 @@ public class TestLazyPersistFiles {
/**
* RamDisk eviction after lazy persist to disk.
- * Evicted blocks are still readable with on-disk replicas.
* @throws IOException
* @throws InterruptedException
*/
- @Test (timeout=300000)
- public void testRamDiskEviction()
- throws IOException, InterruptedException {
- startUpCluster(REPL_FACTOR,
- new StorageType[] { RAM_DISK, DEFAULT },
- (2 * BLOCK_SIZE - 1)); // 1 replica + delta.
+ @Test (timeout=300000)
+ public void testRamDiskEviction() throws IOException, InterruptedException {
+ startUpCluster(true, 1 + EVICTION_LOW_WATERMARK);
final String METHOD_NAME = GenericTestUtils.getMethodName();
Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
Path path2 = new Path("/" + METHOD_NAME + ".02.dat");
@@ -405,16 +393,16 @@ public class TestLazyPersistFiles {
ensureFileReplicasOnStorageType(path1, RAM_DISK);
// Sleep for a short time to allow the lazy writer thread to do its job.
- // However the block replica should not be evicted from RAM_DISK yet.
Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);
ensureFileReplicasOnStorageType(path1, RAM_DISK);
// Create another file with a replica on RAM_DISK.
makeTestFile(path2, BLOCK_SIZE, true);
+ Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);
triggerBlockReport();
- // Make sure that the second file's block replica is on RAM_DISK, whereas
- // the original file's block replica is now on disk.
+ // Ensure the first file was evicted to disk, the second is still on
+ // RAM_DISK.
ensureFileReplicasOnStorageType(path2, RAM_DISK);
ensureFileReplicasOnStorageType(path1, DEFAULT);
}
@@ -428,9 +416,7 @@ public class TestLazyPersistFiles {
@Test (timeout=300000)
public void testRamDiskEvictionBeforePersist()
throws IOException, InterruptedException {
- // 1 replica + delta, lazy persist interval every 50 minutes
- startUpCluster(REPL_FACTOR, new StorageType[] { RAM_DISK, DEFAULT },
- (2 * BLOCK_SIZE - 1));
+ startUpCluster(true, 1);
final String METHOD_NAME = GenericTestUtils.getMethodName();
Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
Path path2 = new Path("/" + METHOD_NAME + ".02.dat");
@@ -463,8 +449,7 @@ public class TestLazyPersistFiles {
@Test (timeout=300000)
public void testRamDiskEvictionLRU()
throws IOException, InterruptedException {
- startUpCluster(REPL_FACTOR, new StorageType[] { RAM_DISK, DEFAULT },
- (4 * BLOCK_SIZE -1)); // 3 replica + delta.
+ startUpCluster(true, 3);
final String METHOD_NAME = GenericTestUtils.getMethodName();
final int NUM_PATHS = 6;
Path paths[] = new Path[NUM_PATHS];
@@ -501,8 +486,7 @@ public class TestLazyPersistFiles {
@Test (timeout=300000)
public void testDeleteBeforePersist()
throws IOException, InterruptedException {
- startUpCluster(REPL_FACTOR, new StorageType[] { RAM_DISK, DEFAULT },
- -1);
+ startUpCluster(true, -1);
final String METHOD_NAME = GenericTestUtils.getMethodName();
stopLazyWriter(cluster.getDataNodes().get(0));
@@ -527,7 +511,7 @@ public class TestLazyPersistFiles {
@Test (timeout=300000)
public void testDeleteAfterPersist()
throws IOException, InterruptedException {
- startUpCluster(REPL_FACTOR, new StorageType[] { RAM_DISK, DEFAULT }, -1);
+ startUpCluster(true, -1);
final String METHOD_NAME = GenericTestUtils.getMethodName();
Path path = new Path("/" + METHOD_NAME + ".dat");
@@ -554,8 +538,7 @@ public class TestLazyPersistFiles {
@Test (timeout=300000)
public void testDfsUsageCreateDelete()
throws IOException, InterruptedException {
- startUpCluster(REPL_FACTOR, new StorageType[] { RAM_DISK, DEFAULT },
- 5 * BLOCK_SIZE - 1); // 4 replica + delta
+ startUpCluster(true, 4);
final String METHOD_NAME = GenericTestUtils.getMethodName();
Path path = new Path("/" + METHOD_NAME + ".dat");
@@ -586,8 +569,7 @@ public class TestLazyPersistFiles {
@Test (timeout=300000)
public void testConcurrentRead()
throws Exception {
- startUpCluster(REPL_FACTOR, new StorageType[] { DEFAULT, RAM_DISK },
- 3 * BLOCK_SIZE -1); // 2 replicas + delta
+ startUpCluster(true, 2);
final String METHOD_NAME = GenericTestUtils.getMethodName();
final Path path1 = new Path("/" + METHOD_NAME + ".dat");
@@ -638,8 +620,7 @@ public class TestLazyPersistFiles {
@Test (timeout=300000)
public void testConcurrentWrites()
throws IOException, InterruptedException {
- startUpCluster(REPL_FACTOR, new StorageType[] { RAM_DISK, DEFAULT },
- (10 * BLOCK_SIZE -1)); // 9 replica + delta.
+ startUpCluster(true, 9);
final String METHOD_NAME = GenericTestUtils.getMethodName();
final int SEED = 0xFADED;
final int NUM_WRITERS = 4;
@@ -659,8 +640,7 @@ public class TestLazyPersistFiles {
ExecutorService executor = Executors.newFixedThreadPool(THREADPOOL_SIZE);
for (int i = 0; i < NUM_WRITERS; i++) {
- Runnable writer = new WriterRunnable(cluster, i, paths[i], SEED, latch,
- testFailed);
+ Runnable writer = new WriterRunnable(i, paths[i], SEED, latch, testFailed);
executor.execute(writer);
}
@@ -677,9 +657,7 @@ public class TestLazyPersistFiles {
public void testDnRestartWithSavedReplicas()
throws IOException, InterruptedException {
- startUpCluster(REPL_FACTOR,
- new StorageType[] {RAM_DISK, DEFAULT },
- (2 * BLOCK_SIZE - 1)); // 1 replica + delta.
+ startUpCluster(true, -1);
final String METHOD_NAME = GenericTestUtils.getMethodName();
Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
@@ -703,9 +681,7 @@ public class TestLazyPersistFiles {
public void testDnRestartWithUnsavedReplicas()
throws IOException, InterruptedException {
- startUpCluster(REPL_FACTOR,
- new StorageType[] {RAM_DISK, DEFAULT },
- (2 * BLOCK_SIZE - 1)); // 1 replica + delta.
+ startUpCluster(true, 1);
stopLazyWriter(cluster.getDataNodes().get(0));
final String METHOD_NAME = GenericTestUtils.getMethodName();
@@ -727,9 +703,8 @@ public class TestLazyPersistFiles {
* If ramDiskStorageLimit is >=0, then RAM_DISK capacity is artificially
* capped. If ramDiskStorageLimit < 0 then it is ignored.
*/
- private void startUpCluster(final int numDataNodes,
- final StorageType[] storageTypes,
- final long ramDiskStorageLimit,
+ private void startUpCluster(boolean hasTransientStorage,
+ final int ramDiskReplicaCapacity,
final boolean useSCR)
throws IOException {
@@ -739,42 +714,36 @@ public class TestLazyPersistFiles {
LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC);
conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, HEARTBEAT_INTERVAL_SEC);
conf.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
- HEARTBEAT_RECHECK_INTERVAL_MSEC);
+ HEARTBEAT_RECHECK_INTERVAL_MSEC);
conf.setInt(DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC,
- LAZY_WRITER_INTERVAL_SEC);
+ LAZY_WRITER_INTERVAL_SEC);
+ conf.setInt(DFS_DATANODE_RAM_DISK_LOW_WATERMARK_REPLICAS,
+ EVICTION_LOW_WATERMARK);
- conf.setBoolean(DFS_CLIENT_READ_SHORTCIRCUIT_KEY,useSCR);
+ conf.setBoolean(DFS_CLIENT_READ_SHORTCIRCUIT_KEY, useSCR);
- REPL_FACTOR = 1; //Reset in case a test has modified the value
+ long[] capacities = null;
+ if (hasTransientStorage && ramDiskReplicaCapacity >= 0) {
+ // Convert replica count to byte count, add some delta for .meta and VERSION files.
+ long ramDiskStorageLimit = ((long) ramDiskReplicaCapacity * BLOCK_SIZE) + (BLOCK_SIZE - 1);
+ capacities = new long[] { ramDiskStorageLimit, -1 };
+ }
cluster = new MiniDFSCluster
.Builder(conf)
- .numDataNodes(numDataNodes)
- .storageTypes(storageTypes != null ? storageTypes : new StorageType[] { DEFAULT, DEFAULT })
+ .numDataNodes(REPL_FACTOR)
+ .storageCapacities(capacities)
+ .storageTypes(hasTransientStorage ? new StorageType[]{ RAM_DISK, DEFAULT } : null)
.build();
fs = cluster.getFileSystem();
client = fs.getClient();
-
- // Artificially cap the storage capacity of the RAM_DISK volume.
- if (ramDiskStorageLimit >= 0) {
- List<? extends FsVolumeSpi> volumes =
- cluster.getDataNodes().get(0).getFSDataset().getVolumes();
-
- for (FsVolumeSpi volume : volumes) {
- if (volume.getStorageType() == RAM_DISK) {
- ((FsTransientVolumeImpl) volume).setCapacityForTesting(ramDiskStorageLimit);
- }
- }
- }
-
LOG.info("Cluster startup complete");
}
- private void startUpCluster(final int numDataNodes,
- final StorageType[] storageTypes,
- final long ramDiskStorageLimit)
+ private void startUpCluster(boolean hasTransientStorage,
+ final int ramDiskReplicaCapacity)
throws IOException {
- startUpCluster(numDataNodes, storageTypes, ramDiskStorageLimit, false);
+ startUpCluster(hasTransientStorage, ramDiskReplicaCapacity, false);
}
private void makeTestFile(Path path, long length, final boolean isLazyPersist)
@@ -908,17 +877,15 @@ public class TestLazyPersistFiles {
class WriterRunnable implements Runnable {
private final int id;
- private final MiniDFSCluster cluster;
private final Path paths[];
private final int seed;
private CountDownLatch latch;
private AtomicBoolean bFail;
- public WriterRunnable(MiniDFSCluster cluster, int threadIndex, Path[] paths,
+ public WriterRunnable(int threadIndex, Path[] paths,
int seed, CountDownLatch latch,
AtomicBoolean bFail) {
id = threadIndex;
- this.cluster = cluster;
this.paths = paths;
this.seed = seed;
this.latch = latch;