You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ar...@apache.org on 2015/06/20 22:29:27 UTC
[1/2] hadoop git commit: HDFS-8192. Eviction should key off used
locked memory instead of ram disk free space. (Contributed by Arpit Agarwal)
Repository: hadoop
Updated Branches:
refs/heads/branch-2 5dcda1685 -> 175c5829f
refs/heads/trunk 658b5c84a -> c7d022b66
HDFS-8192. Eviction should key off used locked memory instead of ram disk free space. (Contributed by Arpit Agarwal)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c7d022b6
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c7d022b6
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c7d022b6
Branch: refs/heads/trunk
Commit: c7d022b66f0c5baafbb7000a435c1d6e39906efe
Parents: 658b5c8
Author: Arpit Agarwal <ar...@apache.org>
Authored: Sat Jun 20 13:27:52 2015 -0700
Committer: Arpit Agarwal <ar...@apache.org>
Committed: Sat Jun 20 13:27:52 2015 -0700
----------------------------------------------------------------------
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 +
.../org/apache/hadoop/hdfs/DFSConfigKeys.java | 4 -
.../datanode/fsdataset/impl/FsDatasetCache.java | 7 +
.../datanode/fsdataset/impl/FsDatasetImpl.java | 98 +++++-----
.../hdfs/server/balancer/TestBalancer.java | 1 -
.../fsdataset/impl/LazyPersistTestCase.java | 42 ++--
.../impl/TestLazyPersistLockedMemory.java | 25 ++-
.../impl/TestLazyPersistReplicaPlacement.java | 36 +++-
.../datanode/fsdataset/impl/TestLazyWriter.java | 62 +++---
.../fsdataset/impl/TestScrLazyPersistFiles.java | 193 +++++++------------
10 files changed, 224 insertions(+), 247 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c7d022b6/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index aad3c25..2e030b9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -658,6 +658,9 @@ Release 2.8.0 - UNRELEASED
do not generate spurious reconfig warnings (Lei (Eddy) Xu via Colin P.
McCabe)
+ HDFS-8192. Eviction should key off used locked memory instead of
+ ram disk free space. (Arpit Agarwal)
+
OPTIMIZATIONS
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c7d022b6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 5ce2863..30540a9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -94,10 +94,6 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final int DFS_DATANODE_LAZY_WRITER_INTERVAL_DEFAULT_SEC = 60;
public static final String DFS_DATANODE_RAM_DISK_REPLICA_TRACKER_KEY = "dfs.datanode.ram.disk.replica.tracker";
public static final Class<RamDiskReplicaLruTracker> DFS_DATANODE_RAM_DISK_REPLICA_TRACKER_DEFAULT = RamDiskReplicaLruTracker.class;
- public static final String DFS_DATANODE_RAM_DISK_LOW_WATERMARK_PERCENT = "dfs.datanode.ram.disk.low.watermark.percent";
- public static final float DFS_DATANODE_RAM_DISK_LOW_WATERMARK_PERCENT_DEFAULT = 10.0f;
- public static final String DFS_DATANODE_RAM_DISK_LOW_WATERMARK_BYTES = "dfs.datanode.ram.disk.low.watermark.bytes";
- public static final long DFS_DATANODE_RAM_DISK_LOW_WATERMARK_BYTES_DEFAULT = DFS_BLOCK_SIZE_DEFAULT;
public static final String DFS_DATANODE_NETWORK_COUNTS_CACHE_MAX_SIZE_KEY = "dfs.datanode.network.counts.cache.max.size";
public static final int DFS_DATANODE_NETWORK_COUNTS_CACHE_MAX_SIZE_DEFAULT = Integer.MAX_VALUE;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c7d022b6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java
index 6f524b2..f70d4af 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java
@@ -405,6 +405,13 @@ public class FsDatasetCache {
}
/**
+ * Round up to the OS page size.
+ */
+ long roundUpPageSize(long count) {
+ return usedBytesCount.rounder.roundUp(count);
+ }
+
+ /**
* Background worker that mmaps, mlocks, and checksums a block
*/
private class CachingTask implements Runnable {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c7d022b6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index 8ebd214..a1ff918 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -1302,14 +1302,13 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
if (allowLazyPersist &&
lazyWriter != null &&
b.getNumBytes() % cacheManager.getOsPageSize() == 0 &&
- (cacheManager.reserve(b.getNumBytes())) > 0) {
+ reserveLockedMemory(b.getNumBytes())) {
try {
// First try to place the block on a transient volume.
ref = volumes.getNextTransientVolume(b.getNumBytes());
datanode.getMetrics().incrRamDiskBlocksWrite();
} catch(DiskOutOfSpaceException de) {
// Ignore the exception since we just fall back to persistent storage.
- datanode.getMetrics().incrRamDiskBlocksWriteFallback();
} finally {
if (ref == null) {
cacheManager.release(b.getNumBytes());
@@ -1323,6 +1322,11 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
FsVolumeImpl v = (FsVolumeImpl) ref.getVolume();
// create an rbw file to hold block in the designated volume
+
+ if (allowLazyPersist && !v.isTransientStorage()) {
+ datanode.getMetrics().incrRamDiskBlocksWriteFallback();
+ }
+
File f;
try {
f = v.createRbwFile(b.getBlockPoolId(), b.getLocalBlock());
@@ -2833,20 +2837,11 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
class LazyWriter implements Runnable {
private volatile boolean shouldRun = true;
final int checkpointerInterval;
- final float lowWatermarkFreeSpacePercentage;
- final long lowWatermarkFreeSpaceBytes;
-
public LazyWriter(Configuration conf) {
this.checkpointerInterval = conf.getInt(
DFSConfigKeys.DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC,
DFSConfigKeys.DFS_DATANODE_LAZY_WRITER_INTERVAL_DEFAULT_SEC);
- this.lowWatermarkFreeSpacePercentage = conf.getFloat(
- DFSConfigKeys.DFS_DATANODE_RAM_DISK_LOW_WATERMARK_PERCENT,
- DFSConfigKeys.DFS_DATANODE_RAM_DISK_LOW_WATERMARK_PERCENT_DEFAULT);
- this.lowWatermarkFreeSpaceBytes = conf.getLong(
- DFSConfigKeys.DFS_DATANODE_RAM_DISK_LOW_WATERMARK_BYTES,
- DFSConfigKeys.DFS_DATANODE_RAM_DISK_LOW_WATERMARK_BYTES_DEFAULT);
}
/**
@@ -2908,41 +2903,17 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
return succeeded;
}
- private boolean transientFreeSpaceBelowThreshold() throws IOException {
- long free = 0;
- long capacity = 0;
- float percentFree = 0.0f;
-
- // Don't worry about fragmentation for now. We don't expect more than one
- // transient volume per DN.
- try (FsVolumeReferences volumes = getFsVolumeReferences()) {
- for (FsVolumeSpi fvs : volumes) {
- FsVolumeImpl v = (FsVolumeImpl) fvs;
- if (v.isTransientStorage()) {
- capacity += v.getCapacity();
- free += v.getAvailable();
- }
- }
- }
-
- if (capacity == 0) {
- return false;
- }
-
- percentFree = (float) ((double)free * 100 / capacity);
- return (percentFree < lowWatermarkFreeSpacePercentage) ||
- (free < lowWatermarkFreeSpaceBytes);
- }
-
/**
* Attempt to evict one or more transient block replicas until we
- * have at least spaceNeeded bytes free.
+ * have at least bytesNeeded bytes free.
*/
- private void evictBlocks() throws IOException {
+ public void evictBlocks(long bytesNeeded) throws IOException {
int iterations = 0;
+ final long cacheCapacity = cacheManager.getCacheCapacity();
+
while (iterations++ < MAX_BLOCK_EVICTIONS_PER_ITERATION &&
- transientFreeSpaceBelowThreshold()) {
+ (cacheCapacity - cacheManager.getCacheUsed()) < bytesNeeded) {
RamDiskReplica replicaState = ramDiskReplicaTracker.getNextCandidateForEviction();
if (replicaState == null) {
@@ -2959,7 +2930,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
final String bpid = replicaState.getBlockPoolId();
synchronized (FsDatasetImpl.this) {
- replicaInfo = getReplicaInfo(replicaState.getBlockPoolId(), replicaState.getBlockId());
+ replicaInfo = getReplicaInfo(replicaState.getBlockPoolId(),
+ replicaState.getBlockId());
Preconditions.checkState(replicaInfo.getVolume().isTransientStorage());
blockFile = replicaInfo.getBlockFile();
metaFile = replicaInfo.getMetaFile();
@@ -2968,7 +2940,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
ramDiskReplicaTracker.discardReplica(replicaState.getBlockPoolId(),
replicaState.getBlockId(), false);
- // Move the replica from lazyPersist/ to finalized/ on target volume
+ // Move the replica from lazyPersist/ to finalized/ on
+ // the target volume
BlockPoolSlice bpSlice =
replicaState.getLazyPersistVolume().getBlockPoolSlice(bpid);
File newBlockFile = bpSlice.activateSavedReplica(
@@ -2992,10 +2965,12 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
if (replicaState.getNumReads() == 0) {
datanode.getMetrics().incrRamDiskBlocksEvictedWithoutRead();
}
- }
- removeOldReplica(replicaInfo, newReplicaInfo, blockFile, metaFile,
- blockFileUsed, metaFileUsed, bpid);
+ // Delete the block+meta files from RAM disk and release locked
+ // memory.
+ removeOldReplica(replicaInfo, newReplicaInfo, blockFile, metaFile,
+ blockFileUsed, metaFileUsed, bpid);
+ }
}
}
@@ -3006,7 +2981,6 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
while (fsRunning && shouldRun) {
try {
numSuccessiveFailures = saveNextReplica() ? 0 : (numSuccessiveFailures + 1);
- evictBlocks();
// Sleep if we have no more work to do or if it looks like we are not
// making any forward progress. This is to ensure that if all persist
@@ -3094,5 +3068,37 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
cacheManager.releaseRoundDown(count);
}
}
+
+ /**
+ * Attempt to evict blocks from cache Manager to free the requested
+ * bytes.
+ *
+ * @param bytesNeeded
+ */
+ @VisibleForTesting
+ public void evictLazyPersistBlocks(long bytesNeeded) {
+ try {
+ ((LazyWriter) lazyWriter.getRunnable()).evictBlocks(bytesNeeded);
+ } catch(IOException ioe) {
+ LOG.info("Ignoring exception ", ioe);
+ }
+ }
+
+ /**
+ * Attempt to reserve the given amount of memory with the cache Manager.
+ * @param bytesNeeded
+ * @return
+ */
+ boolean reserveLockedMemory(long bytesNeeded) {
+ if (cacheManager.reserve(bytesNeeded) > 0) {
+ return true;
+ }
+
+ // Round up bytes needed to osPageSize and attempt to evict
+ // one more more blocks to free up the reservation.
+ bytesNeeded = cacheManager.roundUpPageSize(bytesNeeded);
+ evictLazyPersistBlocks(bytesNeeded);
+ return cacheManager.reserve(bytesNeeded) > 0;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c7d022b6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
index 1f7bade..e1ce1b3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
@@ -130,7 +130,6 @@ public class TestBalancer {
conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, 1);
conf.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 500);
conf.setInt(DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC, 1);
- conf.setInt(DFS_DATANODE_RAM_DISK_LOW_WATERMARK_BYTES, DEFAULT_RAM_DISK_BLOCK_SIZE);
LazyPersistTestCase.initCacheManipulator();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c7d022b6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyPersistTestCase.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyPersistTestCase.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyPersistTestCase.java
index 5ce5cc6..ce29fc8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyPersistTestCase.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyPersistTestCase.java
@@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
+import com.google.common.base.Supplier;
+import org.apache.commons.lang.UnhandledException;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import static org.apache.hadoop.fs.CreateFlag.CREATE;
@@ -37,6 +39,7 @@ import java.util.EnumSet;
import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.TimeoutException;
import com.google.common.base.Preconditions;
import org.apache.commons.io.IOUtils;
@@ -55,6 +58,7 @@ import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
@@ -79,7 +83,6 @@ public abstract class LazyPersistTestCase {
protected static final int BLOCK_SIZE = 5 * 1024 * 1024;
protected static final int BUFFER_LENGTH = 4096;
- protected static final int EVICTION_LOW_WATERMARK = 1;
private static final long HEARTBEAT_INTERVAL_SEC = 1;
private static final int HEARTBEAT_RECHECK_INTERVAL_MSEC = 500;
private static final String JMX_RAM_DISK_METRICS_PATTERN = "^RamDisk";
@@ -236,7 +239,6 @@ public abstract class LazyPersistTestCase {
StorageType[] storageTypes,
int ramDiskReplicaCapacity,
long ramDiskStorageLimit,
- long evictionLowWatermarkReplicas,
long maxLockedMemory,
boolean useSCR,
boolean useLegacyBlockReaderLocal,
@@ -256,8 +258,6 @@ public abstract class LazyPersistTestCase {
HEARTBEAT_RECHECK_INTERVAL_MSEC);
conf.setInt(DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC,
LAZY_WRITER_INTERVAL_SEC);
- conf.setLong(DFS_DATANODE_RAM_DISK_LOW_WATERMARK_BYTES,
- evictionLowWatermarkReplicas * BLOCK_SIZE);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_SAFEMODE_MIN_DATANODES_KEY, 1);
conf.setLong(DFS_DATANODE_MAX_LOCKED_MEMORY_KEY, maxLockedMemory);
@@ -389,12 +389,6 @@ public abstract class LazyPersistTestCase {
return this;
}
- public ClusterWithRamDiskBuilder setEvictionLowWatermarkReplicas(
- long evictionLowWatermarkReplicas) {
- this.evictionLowWatermarkReplicas = evictionLowWatermarkReplicas;
- return this;
- }
-
public ClusterWithRamDiskBuilder disableScrubber() {
this.disableScrubber = true;
return this;
@@ -403,8 +397,8 @@ public abstract class LazyPersistTestCase {
public void build() throws IOException {
LazyPersistTestCase.this.startUpCluster(
numDatanodes, hasTransientStorage, storageTypes, ramDiskReplicaCapacity,
- ramDiskStorageLimit, evictionLowWatermarkReplicas,
- maxLockedMemory, useScr, useLegacyBlockReaderLocal, disableScrubber);
+ ramDiskStorageLimit, maxLockedMemory, useScr, useLegacyBlockReaderLocal,
+ disableScrubber);
}
private int numDatanodes = REPL_FACTOR;
@@ -415,7 +409,6 @@ public abstract class LazyPersistTestCase {
private boolean hasTransientStorage = true;
private boolean useScr = false;
private boolean useLegacyBlockReaderLocal = false;
- private long evictionLowWatermarkReplicas = EVICTION_LOW_WATERMARK;
private boolean disableScrubber=false;
}
@@ -513,4 +506,27 @@ public abstract class LazyPersistTestCase {
e.printStackTrace();
}
}
+
+ protected void waitForMetric(final String metricName, final int expectedValue)
+ throws TimeoutException, InterruptedException {
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
+ @Override
+ public Boolean get() {
+ try {
+ final int currentValue = Integer.parseInt(jmx.getValue(metricName));
+ LOG.info("Waiting for " + metricName +
+ " to reach value " + expectedValue +
+ ", current value = " + currentValue);
+ return currentValue == expectedValue;
+ } catch (Exception e) {
+ throw new UnhandledException("Test failed due to unexpected exception", e);
+ }
+ }
+ }, 1000, Integer.MAX_VALUE);
+ }
+
+ protected void triggerEviction(DataNode dn) {
+ FsDatasetImpl fsDataset = (FsDatasetImpl) dn.getFSDataset();
+ fsDataset.evictLazyPersistBlocks(Long.MAX_VALUE); // Run one eviction cycle.
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c7d022b6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistLockedMemory.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistLockedMemory.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistLockedMemory.java
index 9ea4665..eef8f0b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistLockedMemory.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistLockedMemory.java
@@ -28,9 +28,7 @@ import org.apache.hadoop.hdfs.DFSOutputStream;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
-import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.test.GenericTestUtils;
-import org.apache.hadoop.test.MetricsAsserts;
import org.junit.Test;
import java.io.IOException;
@@ -103,25 +101,26 @@ public class TestLazyPersistLockedMemory extends LazyPersistTestCase {
* Verify that locked RAM is released when blocks are evicted from RAM disk.
*/
@Test
- public void testReleaseOnEviction()
- throws IOException, TimeoutException, InterruptedException {
+ public void testReleaseOnEviction() throws Exception {
getClusterBuilder().setNumDatanodes(1)
.setMaxLockedMemory(BLOCK_SIZE)
.setRamDiskReplicaCapacity(BLOCK_SIZE * 2 - 1)
.build();
final String METHOD_NAME = GenericTestUtils.getMethodName();
- final FsDatasetSpi<?> fsd = cluster.getDataNodes().get(0).getFSDataset();
+ final FsDatasetImpl fsd =
+ (FsDatasetImpl) cluster.getDataNodes().get(0).getFSDataset();
- Path path = new Path("/" + METHOD_NAME + ".dat");
- makeTestFile(path, BLOCK_SIZE, true);
+ Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
+ makeTestFile(path1, BLOCK_SIZE, true);
+ assertThat(fsd.getCacheUsed(), is((long) BLOCK_SIZE));
- // The block should get evicted soon since it pushes RAM disk free
- // space below the threshold.
- waitForLockedBytesUsed(fsd, 0);
+ // Wait until the replica is written to persistent storage.
+ waitForMetric("RamDiskBlocksLazyPersisted", 1);
- MetricsRecordBuilder rb =
- MetricsAsserts.getMetrics(cluster.getDataNodes().get(0).getMetrics().name());
- MetricsAsserts.assertCounter("RamDiskBlocksEvicted", 1L, rb);
+ // Trigger eviction and verify locked bytes were released.
+ fsd.evictLazyPersistBlocks(Long.MAX_VALUE);
+ verifyRamDiskJMXMetric("RamDiskBlocksEvicted", 1);
+ waitForLockedBytesUsed(fsd, 0);
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c7d022b6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistReplicaPlacement.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistReplicaPlacement.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistReplicaPlacement.java
index 018eaba..c89475a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistReplicaPlacement.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistReplicaPlacement.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.test.GenericTestUtils;
@@ -28,6 +29,8 @@ import java.io.IOException;
import static org.apache.hadoop.fs.StorageType.DEFAULT;
import static org.apache.hadoop.fs.StorageType.RAM_DISK;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.fail;
public class TestLazyPersistReplicaPlacement extends LazyPersistTestCase {
@@ -70,32 +73,50 @@ public class TestLazyPersistReplicaPlacement extends LazyPersistTestCase {
ensureFileReplicasOnStorageType(path, DEFAULT);
}
+ @Test
+ public void testSynchronousEviction() throws Exception {
+ getClusterBuilder().setMaxLockedMemory(BLOCK_SIZE).build();
+ final String METHOD_NAME = GenericTestUtils.getMethodName();
+
+ final Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
+ makeTestFile(path1, BLOCK_SIZE, true);
+ ensureFileReplicasOnStorageType(path1, RAM_DISK);
+
+ // Wait until the replica is written to persistent storage.
+ waitForMetric("RamDiskBlocksLazyPersisted", 1);
+
+ // Ensure that writing a new file to RAM DISK evicts the block
+ // for the previous one.
+ Path path2 = new Path("/" + METHOD_NAME + ".02.dat");
+ makeTestFile(path2, BLOCK_SIZE, true);
+ verifyRamDiskJMXMetric("RamDiskBlocksEvictedWithoutRead", 1);
+ }
+
/**
* File can not fit in RamDisk even with eviction
* @throws IOException
*/
@Test
public void testFallbackToDiskFull() throws Exception {
- getClusterBuilder().setRamDiskReplicaCapacity(0).build();
+ getClusterBuilder().setMaxLockedMemory(BLOCK_SIZE / 2).build();
final String METHOD_NAME = GenericTestUtils.getMethodName();
Path path = new Path("/" + METHOD_NAME + ".dat");
makeTestFile(path, BLOCK_SIZE, true);
ensureFileReplicasOnStorageType(path, DEFAULT);
-
verifyRamDiskJMXMetric("RamDiskBlocksWriteFallback", 1);
}
/**
* File partially fit in RamDisk after eviction.
* RamDisk can fit 2 blocks. Write a file with 5 blocks.
- * Expect 2 or less blocks are on RamDisk and 3 or more on disk.
+ * Expect 2 blocks are on RamDisk and rest on disk.
* @throws IOException
*/
@Test
public void testFallbackToDiskPartial()
throws IOException, InterruptedException {
- getClusterBuilder().setRamDiskReplicaCapacity(2).build();
+ getClusterBuilder().setMaxLockedMemory(2 * BLOCK_SIZE).build();
final String METHOD_NAME = GenericTestUtils.getMethodName();
Path path = new Path("/" + METHOD_NAME + ".dat");
@@ -122,8 +143,8 @@ public class TestLazyPersistReplicaPlacement extends LazyPersistTestCase {
// Since eviction is asynchronous, depending on the timing of eviction
// wrt writes, we may get 2 or less blocks on RAM disk.
- assert(numBlocksOnRamDisk <= 2);
- assert(numBlocksOnDisk >= 3);
+ assertThat(numBlocksOnRamDisk, is(2));
+ assertThat(numBlocksOnDisk, is(3));
}
/**
@@ -134,7 +155,8 @@ public class TestLazyPersistReplicaPlacement extends LazyPersistTestCase {
*/
@Test
public void testRamDiskNotChosenByDefault() throws IOException {
- getClusterBuilder().build();
+ getClusterBuilder().setStorageTypes(new StorageType[] {RAM_DISK, RAM_DISK})
+ .build();
final String METHOD_NAME = GenericTestUtils.getMethodName();
Path path = new Path("/" + METHOD_NAME + ".dat");
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c7d022b6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyWriter.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyWriter.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyWriter.java
index ee8aaf0..6b16066 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyWriter.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyWriter.java
@@ -28,6 +28,7 @@ import org.junit.Test;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.concurrent.TimeoutException;
import static org.apache.hadoop.fs.StorageType.DEFAULT;
import static org.apache.hadoop.fs.StorageType.RAM_DISK;
@@ -38,18 +39,16 @@ import static org.junit.Assert.assertTrue;
public class TestLazyWriter extends LazyPersistTestCase {
@Test
public void testLazyPersistBlocksAreSaved()
- throws IOException, InterruptedException {
+ throws IOException, InterruptedException, TimeoutException {
getClusterBuilder().build();
+ final int NUM_BLOCKS = 10;
final String METHOD_NAME = GenericTestUtils.getMethodName();
Path path = new Path("/" + METHOD_NAME + ".dat");
// Create a test file
- makeTestFile(path, BLOCK_SIZE * 10, true);
+ makeTestFile(path, BLOCK_SIZE * NUM_BLOCKS, true);
LocatedBlocks locatedBlocks = ensureFileReplicasOnStorageType(path, RAM_DISK);
-
- // Sleep for a short time to allow the lazy writer thread to do its job
- Thread.sleep(6 * LAZY_WRITER_INTERVAL_SEC * 1000);
-
+ waitForMetric("RamDiskBlocksLazyPersisted", NUM_BLOCKS);
LOG.info("Verifying copy was saved to lazyPersist/");
// Make sure that there is a saved copy of the replica on persistent
@@ -57,35 +56,22 @@ public class TestLazyWriter extends LazyPersistTestCase {
ensureLazyPersistBlocksAreSaved(locatedBlocks);
}
- /**
- * RamDisk eviction after lazy persist to disk.
- * @throws Exception
- */
@Test
- public void testRamDiskEviction() throws Exception {
- getClusterBuilder().setRamDiskReplicaCapacity(1 + EVICTION_LOW_WATERMARK).build();
+ public void testSynchronousEviction() throws Exception {
+ getClusterBuilder().setMaxLockedMemory(BLOCK_SIZE).build();
final String METHOD_NAME = GenericTestUtils.getMethodName();
- Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
- Path path2 = new Path("/" + METHOD_NAME + ".02.dat");
- final int SEED = 0xFADED;
- makeRandomTestFile(path1, BLOCK_SIZE, true, SEED);
+ final Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
+ makeTestFile(path1, BLOCK_SIZE, true);
ensureFileReplicasOnStorageType(path1, RAM_DISK);
- // Sleep for a short time to allow the lazy writer thread to do its job.
- Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);
- ensureFileReplicasOnStorageType(path1, RAM_DISK);
+ // Wait until the replica is written to persistent storage.
+ waitForMetric("RamDiskBlocksLazyPersisted", 1);
- // Create another file with a replica on RAM_DISK.
+ // Ensure that writing a new file to RAM DISK evicts the block
+ // for the previous one.
+ Path path2 = new Path("/" + METHOD_NAME + ".02.dat");
makeTestFile(path2, BLOCK_SIZE, true);
- Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);
- triggerBlockReport();
-
- // Ensure the first file was evicted to disk, the second is still on
- // RAM_DISK.
- ensureFileReplicasOnStorageType(path2, RAM_DISK);
- ensureFileReplicasOnStorageType(path1, DEFAULT);
-
verifyRamDiskJMXMetric("RamDiskBlocksEvicted", 1);
verifyRamDiskJMXMetric("RamDiskBlocksEvictedWithoutRead", 1);
}
@@ -98,8 +84,8 @@ public class TestLazyWriter extends LazyPersistTestCase {
*/
@Test
public void testRamDiskEvictionBeforePersist()
- throws IOException, InterruptedException {
- getClusterBuilder().setRamDiskReplicaCapacity(1).build();
+ throws Exception {
+ getClusterBuilder().setMaxLockedMemory(BLOCK_SIZE).build();
final String METHOD_NAME = GenericTestUtils.getMethodName();
Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
Path path2 = new Path("/" + METHOD_NAME + ".02.dat");
@@ -116,6 +102,7 @@ public class TestLazyWriter extends LazyPersistTestCase {
// Eviction should not happen for block of the first file that is not
// persisted yet.
+ verifyRamDiskJMXMetric("RamDiskBlocksEvicted", 0);
ensureFileReplicasOnStorageType(path1, RAM_DISK);
ensureFileReplicasOnStorageType(path2, DEFAULT);
@@ -133,7 +120,7 @@ public class TestLazyWriter extends LazyPersistTestCase {
public void testRamDiskEvictionIsLru()
throws Exception {
final int NUM_PATHS = 5;
- getClusterBuilder().setRamDiskReplicaCapacity(NUM_PATHS + EVICTION_LOW_WATERMARK).build();
+ getClusterBuilder().setMaxLockedMemory(NUM_PATHS * BLOCK_SIZE).build();
final String METHOD_NAME = GenericTestUtils.getMethodName();
Path paths[] = new Path[NUM_PATHS * 2];
@@ -145,8 +132,7 @@ public class TestLazyWriter extends LazyPersistTestCase {
makeTestFile(paths[i], BLOCK_SIZE, true);
}
- // Sleep for a short time to allow the lazy writer thread to do its job.
- Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);
+ waitForMetric("RamDiskBlocksLazyPersisted", NUM_PATHS);
for (int i = 0; i < NUM_PATHS; ++i) {
ensureFileReplicasOnStorageType(paths[i], RAM_DISK);
@@ -227,16 +213,13 @@ public class TestLazyWriter extends LazyPersistTestCase {
makeTestFile(path, BLOCK_SIZE, true);
LocatedBlocks locatedBlocks = ensureFileReplicasOnStorageType(path, RAM_DISK);
-
- // Sleep for a short time to allow the lazy writer thread to do its job
- Thread.sleep(6 * LAZY_WRITER_INTERVAL_SEC * 1000);
+ waitForMetric("RamDiskBlocksLazyPersisted", 1);
// Delete after persist
client.delete(path.toString(), false);
Assert.assertFalse(fs.exists(path));
assertThat(verifyDeletedBlocks(locatedBlocks), is(true));
-
verifyRamDiskJMXMetric("RamDiskBlocksLazyPersisted", 1);
verifyRamDiskJMXMetric("RamDiskBytesLazyPersisted", BLOCK_SIZE);
}
@@ -248,7 +231,7 @@ public class TestLazyWriter extends LazyPersistTestCase {
*/
@Test
public void testDfsUsageCreateDelete()
- throws IOException, InterruptedException {
+ throws IOException, InterruptedException, TimeoutException {
getClusterBuilder().setRamDiskReplicaCapacity(4).build();
final String METHOD_NAME = GenericTestUtils.getMethodName();
Path path = new Path("/" + METHOD_NAME + ".dat");
@@ -261,8 +244,7 @@ public class TestLazyWriter extends LazyPersistTestCase {
assertThat(usedAfterCreate, is((long) BLOCK_SIZE));
- // Sleep for a short time to allow the lazy writer thread to do its job
- Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);
+ waitForMetric("RamDiskBlocksLazyPersisted", 1);
long usedAfterPersist = fs.getUsed();
assertThat(usedAfterPersist, is((long) BLOCK_SIZE));
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c7d022b6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestScrLazyPersistFiles.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestScrLazyPersistFiles.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestScrLazyPersistFiles.java
index 7c7ba64..2512588 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestScrLazyPersistFiles.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestScrLazyPersistFiles.java
@@ -16,6 +16,7 @@
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
+import com.google.common.base.Preconditions;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.FSDataInputStream;
@@ -26,6 +27,7 @@ import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
+import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.net.unix.DomainSocket;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.NativeCodeLoader;
@@ -39,13 +41,20 @@ import org.junit.rules.ExpectedException;
import java.io.File;
import java.io.IOException;
+import java.util.concurrent.TimeoutException;
import static org.apache.hadoop.fs.StorageType.DEFAULT;
import static org.apache.hadoop.fs.StorageType.RAM_DISK;
import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
+/**
+ * Test Lazy persist behavior with short-circuit reads. These tests
+ * will be run on Linux only with Native IO enabled. The tests fake
+ * RAM_DISK storage using local disk.
+ */
public class TestScrLazyPersistFiles extends LazyPersistTestCase {
@BeforeClass
@@ -58,6 +67,10 @@ public class TestScrLazyPersistFiles extends LazyPersistTestCase {
Assume.assumeThat(NativeCodeLoader.isNativeCodeLoaded() && !Path.WINDOWS,
equalTo(true));
Assume.assumeThat(DomainSocket.getLoadingFailureReason(), equalTo(null));
+
+ final long osPageSize = NativeIO.POSIX.getCacheManipulator().getOperatingSystemPageSize();
+ Preconditions.checkState(BLOCK_SIZE >= osPageSize);
+ Preconditions.checkState(BLOCK_SIZE % osPageSize == 0);
}
@Rule
@@ -69,35 +82,27 @@ public class TestScrLazyPersistFiles extends LazyPersistTestCase {
*/
@Test
public void testRamDiskShortCircuitRead()
- throws IOException, InterruptedException {
- getClusterBuilder().setNumDatanodes(REPL_FACTOR)
- .setStorageTypes(new StorageType[]{RAM_DISK, DEFAULT})
- .setRamDiskStorageLimit(2 * BLOCK_SIZE - 1)
- .setUseScr(true)
- .build();
+ throws IOException, InterruptedException, TimeoutException {
+ getClusterBuilder().setUseScr(true).build();
final String METHOD_NAME = GenericTestUtils.getMethodName();
final int SEED = 0xFADED;
Path path = new Path("/" + METHOD_NAME + ".dat");
+ // Create a file and wait till it is persisted.
makeRandomTestFile(path, BLOCK_SIZE, true, SEED);
ensureFileReplicasOnStorageType(path, RAM_DISK);
+ waitForMetric("RamDiskBlocksLazyPersisted", 1);
- // Sleep for a short time to allow the lazy writer thread to do its job
- Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);
-
- //assertThat(verifyReadRandomFile(path, BLOCK_SIZE, SEED), is(true));
- FSDataInputStream fis = fs.open(path);
+ HdfsDataInputStream fis = (HdfsDataInputStream) fs.open(path);
// Verify SCR read counters
try {
- fis = fs.open(path);
byte[] buf = new byte[BUFFER_LENGTH];
fis.read(0, buf, 0, BUFFER_LENGTH);
- HdfsDataInputStream dfsis = (HdfsDataInputStream) fis;
Assert.assertEquals(BUFFER_LENGTH,
- dfsis.getReadStatistics().getTotalBytesRead());
+ fis.getReadStatistics().getTotalBytesRead());
Assert.assertEquals(BUFFER_LENGTH,
- dfsis.getReadStatistics().getTotalShortCircuitBytesRead());
+ fis.getReadStatistics().getTotalShortCircuitBytesRead());
} finally {
fis.close();
fis = null;
@@ -111,106 +116,77 @@ public class TestScrLazyPersistFiles extends LazyPersistTestCase {
* @throws InterruptedException
*/
@Test
- public void testRamDiskEvictionWithShortCircuitReadHandle()
- throws IOException, InterruptedException {
- // 5 replica + delta, SCR.
- getClusterBuilder().setNumDatanodes(REPL_FACTOR)
- .setStorageTypes(new StorageType[]{RAM_DISK, DEFAULT})
- .setRamDiskStorageLimit(6 * BLOCK_SIZE - 1)
- .setEvictionLowWatermarkReplicas(3)
- .setUseScr(true)
- .build();
-
+ public void tesScrDuringEviction()
+ throws Exception {
+ getClusterBuilder().setUseScr(true).build();
final String METHOD_NAME = GenericTestUtils.getMethodName();
Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
- Path path2 = new Path("/" + METHOD_NAME + ".02.dat");
- final int SEED = 0xFADED;
- makeRandomTestFile(path1, BLOCK_SIZE, true, SEED);
+ // Create a file and wait till it is persisted.
+ makeTestFile(path1, BLOCK_SIZE, true);
ensureFileReplicasOnStorageType(path1, RAM_DISK);
+ waitForMetric("RamDiskBlocksLazyPersisted", 1);
- // Sleep for a short time to allow the lazy writer thread to do its job.
- // However the block replica should not be evicted from RAM_DISK yet.
- Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);
-
- // No eviction should happen as the free ratio is below the threshold
- FSDataInputStream fis = fs.open(path1);
+ HdfsDataInputStream fis = (HdfsDataInputStream) fs.open(path1);
try {
// Keep and open read handle to path1 while creating path2
byte[] buf = new byte[BUFFER_LENGTH];
fis.read(0, buf, 0, BUFFER_LENGTH);
-
- // Create the 2nd file that will trigger RAM_DISK eviction.
- makeTestFile(path2, BLOCK_SIZE * 2, true);
- ensureFileReplicasOnStorageType(path2, RAM_DISK);
+ triggerEviction(cluster.getDataNodes().get(0));
// Ensure path1 is still readable from the open SCR handle.
- fis.read(fis.getPos(), buf, 0, BUFFER_LENGTH);
- HdfsDataInputStream dfsis = (HdfsDataInputStream) fis;
- Assert.assertEquals(2 * BUFFER_LENGTH,
- dfsis.getReadStatistics().getTotalBytesRead());
- Assert.assertEquals(2 * BUFFER_LENGTH,
- dfsis.getReadStatistics().getTotalShortCircuitBytesRead());
+ fis.read(0, buf, 0, BUFFER_LENGTH);
+ assertThat(fis.getReadStatistics().getTotalBytesRead(),
+ is((long) 2 * BUFFER_LENGTH));
+ assertThat(fis.getReadStatistics().getTotalShortCircuitBytesRead(),
+ is((long) 2 * BUFFER_LENGTH));
} finally {
IOUtils.closeQuietly(fis);
}
-
- // After the open handle is closed, path1 should be evicted to DISK.
- triggerBlockReport();
- ensureFileReplicasOnStorageType(path1, DEFAULT);
}
@Test
- public void testShortCircuitReadAfterEviction()
- throws IOException, InterruptedException {
- Assume.assumeThat(DomainSocket.getLoadingFailureReason(), equalTo(null));
- getClusterBuilder().setRamDiskReplicaCapacity(1 + EVICTION_LOW_WATERMARK)
- .setUseScr(true)
+ public void testScrAfterEviction()
+ throws IOException, InterruptedException, TimeoutException {
+ getClusterBuilder().setUseScr(true)
.setUseLegacyBlockReaderLocal(false)
.build();
doShortCircuitReadAfterEvictionTest();
}
@Test
- public void testLegacyShortCircuitReadAfterEviction()
- throws IOException, InterruptedException {
- getClusterBuilder().setRamDiskReplicaCapacity(1 + EVICTION_LOW_WATERMARK)
- .setUseScr(true)
+ public void testLegacyScrAfterEviction()
+ throws IOException, InterruptedException, TimeoutException {
+ getClusterBuilder().setUseScr(true)
.setUseLegacyBlockReaderLocal(true)
.build();
doShortCircuitReadAfterEvictionTest();
+
+ // In the implementation of legacy short-circuit reads, any failure is
+ // trapped silently, reverts back to a remote read, and also disables all
+ // subsequent legacy short-circuit reads in the ClientContext.
+ // Assert that it didn't get disabled.
+ ClientContext clientContext = client.getClientContext();
+ Assert.assertFalse(clientContext.getDisableLegacyBlockReaderLocal());
}
private void doShortCircuitReadAfterEvictionTest() throws IOException,
- InterruptedException {
+ InterruptedException, TimeoutException {
final String METHOD_NAME = GenericTestUtils.getMethodName();
Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
- Path path2 = new Path("/" + METHOD_NAME + ".02.dat");
final int SEED = 0xFADED;
makeRandomTestFile(path1, BLOCK_SIZE, true, SEED);
+ ensureFileReplicasOnStorageType(path1, RAM_DISK);
+ waitForMetric("RamDiskBlocksLazyPersisted", 1);
// Verify short-circuit read from RAM_DISK.
- ensureFileReplicasOnStorageType(path1, RAM_DISK);
File metaFile = cluster.getBlockMetadataFile(0,
DFSTestUtil.getFirstBlock(fs, path1));
assertTrue(metaFile.length() <= BlockMetadataHeader.getHeaderSize());
assertTrue(verifyReadRandomFile(path1, BLOCK_SIZE, SEED));
- // Sleep for a short time to allow the lazy writer thread to do its job.
- Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);
-
- // Verify short-circuit read from RAM_DISK once again.
- ensureFileReplicasOnStorageType(path1, RAM_DISK);
- metaFile = cluster.getBlockMetadataFile(0,
- DFSTestUtil.getFirstBlock(fs, path1));
- assertTrue(metaFile.length() <= BlockMetadataHeader.getHeaderSize());
- assertTrue(verifyReadRandomFile(path1, BLOCK_SIZE, SEED));
-
- // Create another file with a replica on RAM_DISK, which evicts the first.
- makeRandomTestFile(path2, BLOCK_SIZE, true, SEED);
- Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);
- triggerBlockReport();
+ triggerEviction(cluster.getDataNodes().get(0));
// Verify short-circuit read still works from DEFAULT storage. This time,
// we'll have a checksum written during lazy persistence.
@@ -219,54 +195,35 @@ public class TestScrLazyPersistFiles extends LazyPersistTestCase {
DFSTestUtil.getFirstBlock(fs, path1));
assertTrue(metaFile.length() > BlockMetadataHeader.getHeaderSize());
assertTrue(verifyReadRandomFile(path1, BLOCK_SIZE, SEED));
-
- // In the implementation of legacy short-circuit reads, any failure is
- // trapped silently, reverts back to a remote read, and also disables all
- // subsequent legacy short-circuit reads in the ClientContext. If the test
- // uses legacy, then assert that it didn't get disabled.
- ClientContext clientContext = client.getClientContext();
- if (clientContext.getUseLegacyBlockReaderLocal()) {
- Assert.assertFalse(clientContext.getDisableLegacyBlockReaderLocal());
- }
}
@Test
- public void testShortCircuitReadBlockFileCorruption() throws IOException,
- InterruptedException {
- Assume.assumeThat(DomainSocket.getLoadingFailureReason(), equalTo(null));
- getClusterBuilder().setRamDiskReplicaCapacity(1 + EVICTION_LOW_WATERMARK)
- .setUseScr(true)
+ public void testScrBlockFileCorruption() throws IOException,
+ InterruptedException, TimeoutException {
+ getClusterBuilder().setUseScr(true)
.setUseLegacyBlockReaderLocal(false)
.build();
doShortCircuitReadBlockFileCorruptionTest();
}
@Test
- public void testLegacyShortCircuitReadBlockFileCorruption() throws IOException,
- InterruptedException {
- getClusterBuilder().setRamDiskReplicaCapacity(1 + EVICTION_LOW_WATERMARK)
- .setUseScr(true)
+ public void testLegacyScrBlockFileCorruption() throws IOException,
+ InterruptedException, TimeoutException {
+ getClusterBuilder().setUseScr(true)
.setUseLegacyBlockReaderLocal(true)
.build();
doShortCircuitReadBlockFileCorruptionTest();
}
public void doShortCircuitReadBlockFileCorruptionTest() throws IOException,
- InterruptedException {
+ InterruptedException, TimeoutException {
final String METHOD_NAME = GenericTestUtils.getMethodName();
Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
- Path path2 = new Path("/" + METHOD_NAME + ".02.dat");
- final int SEED = 0xFADED;
- makeRandomTestFile(path1, BLOCK_SIZE, true, SEED);
+ makeTestFile(path1, BLOCK_SIZE, true);
ensureFileReplicasOnStorageType(path1, RAM_DISK);
-
- // Create another file with a replica on RAM_DISK, which evicts the first.
- makeRandomTestFile(path2, BLOCK_SIZE, true, SEED);
-
- // Sleep for a short time to allow the lazy writer thread to do its job.
- Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);
- triggerBlockReport();
+ waitForMetric("RamDiskBlocksLazyPersisted", 1);
+ triggerEviction(cluster.getDataNodes().get(0));
// Corrupt the lazy-persisted block file, and verify that checksum
// verification catches it.
@@ -277,42 +234,32 @@ public class TestScrLazyPersistFiles extends LazyPersistTestCase {
}
@Test
- public void testShortCircuitReadMetaFileCorruption() throws IOException,
- InterruptedException {
- Assume.assumeThat(DomainSocket.getLoadingFailureReason(), equalTo(null));
- getClusterBuilder().setRamDiskReplicaCapacity(1 + EVICTION_LOW_WATERMARK)
- .setUseScr(true)
+ public void testScrMetaFileCorruption() throws IOException,
+ InterruptedException, TimeoutException {
+ getClusterBuilder().setUseScr(true)
.setUseLegacyBlockReaderLocal(false)
.build();
doShortCircuitReadMetaFileCorruptionTest();
}
@Test
- public void testLegacyShortCircuitReadMetaFileCorruption() throws IOException,
- InterruptedException {
- getClusterBuilder().setRamDiskReplicaCapacity(1 + EVICTION_LOW_WATERMARK)
- .setUseScr(true)
+ public void testLegacyScrMetaFileCorruption() throws IOException,
+ InterruptedException, TimeoutException {
+ getClusterBuilder().setUseScr(true)
.setUseLegacyBlockReaderLocal(true)
.build();
doShortCircuitReadMetaFileCorruptionTest();
}
public void doShortCircuitReadMetaFileCorruptionTest() throws IOException,
- InterruptedException {
+ InterruptedException, TimeoutException {
final String METHOD_NAME = GenericTestUtils.getMethodName();
Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
- Path path2 = new Path("/" + METHOD_NAME + ".02.dat");
- final int SEED = 0xFADED;
- makeRandomTestFile(path1, BLOCK_SIZE, true, SEED);
+ makeTestFile(path1, BLOCK_SIZE, true);
ensureFileReplicasOnStorageType(path1, RAM_DISK);
-
- // Create another file with a replica on RAM_DISK, which evicts the first.
- makeRandomTestFile(path2, BLOCK_SIZE, true, SEED);
-
- // Sleep for a short time to allow the lazy writer thread to do its job.
- Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);
- triggerBlockReport();
+ waitForMetric("RamDiskBlocksLazyPersisted", 1);
+ triggerEviction(cluster.getDataNodes().get(0));
// Corrupt the lazy-persisted checksum file, and verify that checksum
// verification catches it.
[2/2] hadoop git commit: HDFS-8192. Eviction should key off used
locked memory instead of ram disk free space. (Contributed by Arpit Agarwal)
Posted by ar...@apache.org.
HDFS-8192. Eviction should key off used locked memory instead of ram disk free space. (Contributed by Arpit Agarwal)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/175c5829
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/175c5829
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/175c5829
Branch: refs/heads/branch-2
Commit: 175c5829f0bf86c191892dda8953ecefeac63aa7
Parents: 5dcda16
Author: Arpit Agarwal <ar...@apache.org>
Authored: Sat Jun 20 13:27:52 2015 -0700
Committer: Arpit Agarwal <ar...@apache.org>
Committed: Sat Jun 20 13:27:58 2015 -0700
----------------------------------------------------------------------
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 +
.../org/apache/hadoop/hdfs/DFSConfigKeys.java | 4 -
.../datanode/fsdataset/impl/FsDatasetCache.java | 7 +
.../datanode/fsdataset/impl/FsDatasetImpl.java | 98 +++++-----
.../hdfs/server/balancer/TestBalancer.java | 1 -
.../fsdataset/impl/LazyPersistTestCase.java | 42 ++--
.../impl/TestLazyPersistLockedMemory.java | 25 ++-
.../impl/TestLazyPersistReplicaPlacement.java | 36 +++-
.../datanode/fsdataset/impl/TestLazyWriter.java | 62 +++---
.../fsdataset/impl/TestScrLazyPersistFiles.java | 193 +++++++------------
10 files changed, 224 insertions(+), 247 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/175c5829/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index f4b68d6..6e7030e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -314,6 +314,9 @@ Release 2.8.0 - UNRELEASED
do not generate spurious reconfig warnings (Lei (Eddy) Xu via Colin P.
McCabe)
+ HDFS-8192. Eviction should key off used locked memory instead of
+ ram disk free space. (Arpit Agarwal)
+
OPTIMIZATIONS
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
http://git-wip-us.apache.org/repos/asf/hadoop/blob/175c5829/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index e396f53..2e6d256 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -94,10 +94,6 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final int DFS_DATANODE_LAZY_WRITER_INTERVAL_DEFAULT_SEC = 60;
public static final String DFS_DATANODE_RAM_DISK_REPLICA_TRACKER_KEY = "dfs.datanode.ram.disk.replica.tracker";
public static final Class<RamDiskReplicaLruTracker> DFS_DATANODE_RAM_DISK_REPLICA_TRACKER_DEFAULT = RamDiskReplicaLruTracker.class;
- public static final String DFS_DATANODE_RAM_DISK_LOW_WATERMARK_PERCENT = "dfs.datanode.ram.disk.low.watermark.percent";
- public static final float DFS_DATANODE_RAM_DISK_LOW_WATERMARK_PERCENT_DEFAULT = 10.0f;
- public static final String DFS_DATANODE_RAM_DISK_LOW_WATERMARK_BYTES = "dfs.datanode.ram.disk.low.watermark.bytes";
- public static final long DFS_DATANODE_RAM_DISK_LOW_WATERMARK_BYTES_DEFAULT = DFS_BLOCK_SIZE_DEFAULT;
public static final String DFS_DATANODE_NETWORK_COUNTS_CACHE_MAX_SIZE_KEY = "dfs.datanode.network.counts.cache.max.size";
public static final int DFS_DATANODE_NETWORK_COUNTS_CACHE_MAX_SIZE_DEFAULT = Integer.MAX_VALUE;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/175c5829/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java
index 6f524b2..f70d4af 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java
@@ -405,6 +405,13 @@ public class FsDatasetCache {
}
/**
+ * Round up to the OS page size.
+ */
+ long roundUpPageSize(long count) {
+ return usedBytesCount.rounder.roundUp(count);
+ }
+
+ /**
* Background worker that mmaps, mlocks, and checksums a block
*/
private class CachingTask implements Runnable {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/175c5829/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index 167f0d7..8bfe7ae 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -1304,14 +1304,13 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
if (allowLazyPersist &&
lazyWriter != null &&
b.getNumBytes() % cacheManager.getOsPageSize() == 0 &&
- (cacheManager.reserve(b.getNumBytes())) > 0) {
+ reserveLockedMemory(b.getNumBytes())) {
try {
// First try to place the block on a transient volume.
ref = volumes.getNextTransientVolume(b.getNumBytes());
datanode.getMetrics().incrRamDiskBlocksWrite();
} catch(DiskOutOfSpaceException de) {
// Ignore the exception since we just fall back to persistent storage.
- datanode.getMetrics().incrRamDiskBlocksWriteFallback();
} finally {
if (ref == null) {
cacheManager.release(b.getNumBytes());
@@ -1325,6 +1324,11 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
FsVolumeImpl v = (FsVolumeImpl) ref.getVolume();
// create an rbw file to hold block in the designated volume
+
+ if (allowLazyPersist && !v.isTransientStorage()) {
+ datanode.getMetrics().incrRamDiskBlocksWriteFallback();
+ }
+
File f;
try {
f = v.createRbwFile(b.getBlockPoolId(), b.getLocalBlock());
@@ -2834,20 +2838,11 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
class LazyWriter implements Runnable {
private volatile boolean shouldRun = true;
final int checkpointerInterval;
- final float lowWatermarkFreeSpacePercentage;
- final long lowWatermarkFreeSpaceBytes;
-
public LazyWriter(Configuration conf) {
this.checkpointerInterval = conf.getInt(
DFSConfigKeys.DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC,
DFSConfigKeys.DFS_DATANODE_LAZY_WRITER_INTERVAL_DEFAULT_SEC);
- this.lowWatermarkFreeSpacePercentage = conf.getFloat(
- DFSConfigKeys.DFS_DATANODE_RAM_DISK_LOW_WATERMARK_PERCENT,
- DFSConfigKeys.DFS_DATANODE_RAM_DISK_LOW_WATERMARK_PERCENT_DEFAULT);
- this.lowWatermarkFreeSpaceBytes = conf.getLong(
- DFSConfigKeys.DFS_DATANODE_RAM_DISK_LOW_WATERMARK_BYTES,
- DFSConfigKeys.DFS_DATANODE_RAM_DISK_LOW_WATERMARK_BYTES_DEFAULT);
}
/**
@@ -2909,41 +2904,17 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
return succeeded;
}
- private boolean transientFreeSpaceBelowThreshold() throws IOException {
- long free = 0;
- long capacity = 0;
- float percentFree = 0.0f;
-
- // Don't worry about fragmentation for now. We don't expect more than one
- // transient volume per DN.
- try (FsVolumeReferences volumes = getFsVolumeReferences()) {
- for (FsVolumeSpi fvs : volumes) {
- FsVolumeImpl v = (FsVolumeImpl) fvs;
- if (v.isTransientStorage()) {
- capacity += v.getCapacity();
- free += v.getAvailable();
- }
- }
- }
-
- if (capacity == 0) {
- return false;
- }
-
- percentFree = (float) ((double)free * 100 / capacity);
- return (percentFree < lowWatermarkFreeSpacePercentage) ||
- (free < lowWatermarkFreeSpaceBytes);
- }
-
/**
* Attempt to evict one or more transient block replicas until we
- * have at least spaceNeeded bytes free.
+ * have at least bytesNeeded bytes free.
*/
- private void evictBlocks() throws IOException {
+ public void evictBlocks(long bytesNeeded) throws IOException {
int iterations = 0;
+ final long cacheCapacity = cacheManager.getCacheCapacity();
+
while (iterations++ < MAX_BLOCK_EVICTIONS_PER_ITERATION &&
- transientFreeSpaceBelowThreshold()) {
+ (cacheCapacity - cacheManager.getCacheUsed()) < bytesNeeded) {
RamDiskReplica replicaState = ramDiskReplicaTracker.getNextCandidateForEviction();
if (replicaState == null) {
@@ -2960,7 +2931,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
final String bpid = replicaState.getBlockPoolId();
synchronized (FsDatasetImpl.this) {
- replicaInfo = getReplicaInfo(replicaState.getBlockPoolId(), replicaState.getBlockId());
+ replicaInfo = getReplicaInfo(replicaState.getBlockPoolId(),
+ replicaState.getBlockId());
Preconditions.checkState(replicaInfo.getVolume().isTransientStorage());
blockFile = replicaInfo.getBlockFile();
metaFile = replicaInfo.getMetaFile();
@@ -2969,7 +2941,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
ramDiskReplicaTracker.discardReplica(replicaState.getBlockPoolId(),
replicaState.getBlockId(), false);
- // Move the replica from lazyPersist/ to finalized/ on target volume
+ // Move the replica from lazyPersist/ to finalized/ on
+ // the target volume
BlockPoolSlice bpSlice =
replicaState.getLazyPersistVolume().getBlockPoolSlice(bpid);
File newBlockFile = bpSlice.activateSavedReplica(
@@ -2993,10 +2966,12 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
if (replicaState.getNumReads() == 0) {
datanode.getMetrics().incrRamDiskBlocksEvictedWithoutRead();
}
- }
- removeOldReplica(replicaInfo, newReplicaInfo, blockFile, metaFile,
- blockFileUsed, metaFileUsed, bpid);
+ // Delete the block+meta files from RAM disk and release locked
+ // memory.
+ removeOldReplica(replicaInfo, newReplicaInfo, blockFile, metaFile,
+ blockFileUsed, metaFileUsed, bpid);
+ }
}
}
@@ -3007,7 +2982,6 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
while (fsRunning && shouldRun) {
try {
numSuccessiveFailures = saveNextReplica() ? 0 : (numSuccessiveFailures + 1);
- evictBlocks();
// Sleep if we have no more work to do or if it looks like we are not
// making any forward progress. This is to ensure that if all persist
@@ -3095,5 +3069,37 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
cacheManager.releaseRoundDown(count);
}
}
+
+ /**
+ * Attempt to evict blocks from cache Manager to free the requested
+ * bytes.
+ *
+ * @param bytesNeeded
+ */
+ @VisibleForTesting
+ public void evictLazyPersistBlocks(long bytesNeeded) {
+ try {
+ ((LazyWriter) lazyWriter.getRunnable()).evictBlocks(bytesNeeded);
+ } catch(IOException ioe) {
+ LOG.info("Ignoring exception ", ioe);
+ }
+ }
+
+ /**
+ * Attempt to reserve the given amount of memory with the cache Manager.
+ * @param bytesNeeded
+ * @return
+ */
+ boolean reserveLockedMemory(long bytesNeeded) {
+ if (cacheManager.reserve(bytesNeeded) > 0) {
+ return true;
+ }
+
+ // Round up bytes needed to osPageSize and attempt to evict
+ // one more more blocks to free up the reservation.
+ bytesNeeded = cacheManager.roundUpPageSize(bytesNeeded);
+ evictLazyPersistBlocks(bytesNeeded);
+ return cacheManager.reserve(bytesNeeded) > 0;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/175c5829/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
index 69dd54b..65427e9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
@@ -130,7 +130,6 @@ public class TestBalancer {
conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, 1);
conf.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 500);
conf.setInt(DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC, 1);
- conf.setInt(DFS_DATANODE_RAM_DISK_LOW_WATERMARK_BYTES, DEFAULT_RAM_DISK_BLOCK_SIZE);
LazyPersistTestCase.initCacheManipulator();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/175c5829/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyPersistTestCase.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyPersistTestCase.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyPersistTestCase.java
index 5ce5cc6..ce29fc8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyPersistTestCase.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyPersistTestCase.java
@@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
+import com.google.common.base.Supplier;
+import org.apache.commons.lang.UnhandledException;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import static org.apache.hadoop.fs.CreateFlag.CREATE;
@@ -37,6 +39,7 @@ import java.util.EnumSet;
import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.TimeoutException;
import com.google.common.base.Preconditions;
import org.apache.commons.io.IOUtils;
@@ -55,6 +58,7 @@ import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
@@ -79,7 +83,6 @@ public abstract class LazyPersistTestCase {
protected static final int BLOCK_SIZE = 5 * 1024 * 1024;
protected static final int BUFFER_LENGTH = 4096;
- protected static final int EVICTION_LOW_WATERMARK = 1;
private static final long HEARTBEAT_INTERVAL_SEC = 1;
private static final int HEARTBEAT_RECHECK_INTERVAL_MSEC = 500;
private static final String JMX_RAM_DISK_METRICS_PATTERN = "^RamDisk";
@@ -236,7 +239,6 @@ public abstract class LazyPersistTestCase {
StorageType[] storageTypes,
int ramDiskReplicaCapacity,
long ramDiskStorageLimit,
- long evictionLowWatermarkReplicas,
long maxLockedMemory,
boolean useSCR,
boolean useLegacyBlockReaderLocal,
@@ -256,8 +258,6 @@ public abstract class LazyPersistTestCase {
HEARTBEAT_RECHECK_INTERVAL_MSEC);
conf.setInt(DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC,
LAZY_WRITER_INTERVAL_SEC);
- conf.setLong(DFS_DATANODE_RAM_DISK_LOW_WATERMARK_BYTES,
- evictionLowWatermarkReplicas * BLOCK_SIZE);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_SAFEMODE_MIN_DATANODES_KEY, 1);
conf.setLong(DFS_DATANODE_MAX_LOCKED_MEMORY_KEY, maxLockedMemory);
@@ -389,12 +389,6 @@ public abstract class LazyPersistTestCase {
return this;
}
- public ClusterWithRamDiskBuilder setEvictionLowWatermarkReplicas(
- long evictionLowWatermarkReplicas) {
- this.evictionLowWatermarkReplicas = evictionLowWatermarkReplicas;
- return this;
- }
-
public ClusterWithRamDiskBuilder disableScrubber() {
this.disableScrubber = true;
return this;
@@ -403,8 +397,8 @@ public abstract class LazyPersistTestCase {
public void build() throws IOException {
LazyPersistTestCase.this.startUpCluster(
numDatanodes, hasTransientStorage, storageTypes, ramDiskReplicaCapacity,
- ramDiskStorageLimit, evictionLowWatermarkReplicas,
- maxLockedMemory, useScr, useLegacyBlockReaderLocal, disableScrubber);
+ ramDiskStorageLimit, maxLockedMemory, useScr, useLegacyBlockReaderLocal,
+ disableScrubber);
}
private int numDatanodes = REPL_FACTOR;
@@ -415,7 +409,6 @@ public abstract class LazyPersistTestCase {
private boolean hasTransientStorage = true;
private boolean useScr = false;
private boolean useLegacyBlockReaderLocal = false;
- private long evictionLowWatermarkReplicas = EVICTION_LOW_WATERMARK;
private boolean disableScrubber=false;
}
@@ -513,4 +506,27 @@ public abstract class LazyPersistTestCase {
e.printStackTrace();
}
}
+
+ protected void waitForMetric(final String metricName, final int expectedValue)
+ throws TimeoutException, InterruptedException {
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
+ @Override
+ public Boolean get() {
+ try {
+ final int currentValue = Integer.parseInt(jmx.getValue(metricName));
+ LOG.info("Waiting for " + metricName +
+ " to reach value " + expectedValue +
+ ", current value = " + currentValue);
+ return currentValue == expectedValue;
+ } catch (Exception e) {
+ throw new UnhandledException("Test failed due to unexpected exception", e);
+ }
+ }
+ }, 1000, Integer.MAX_VALUE);
+ }
+
+ protected void triggerEviction(DataNode dn) {
+ FsDatasetImpl fsDataset = (FsDatasetImpl) dn.getFSDataset();
+ fsDataset.evictLazyPersistBlocks(Long.MAX_VALUE); // Run one eviction cycle.
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/175c5829/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistLockedMemory.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistLockedMemory.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistLockedMemory.java
index 9ea4665..eef8f0b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistLockedMemory.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistLockedMemory.java
@@ -28,9 +28,7 @@ import org.apache.hadoop.hdfs.DFSOutputStream;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
-import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.test.GenericTestUtils;
-import org.apache.hadoop.test.MetricsAsserts;
import org.junit.Test;
import java.io.IOException;
@@ -103,25 +101,26 @@ public class TestLazyPersistLockedMemory extends LazyPersistTestCase {
* Verify that locked RAM is released when blocks are evicted from RAM disk.
*/
@Test
- public void testReleaseOnEviction()
- throws IOException, TimeoutException, InterruptedException {
+ public void testReleaseOnEviction() throws Exception {
getClusterBuilder().setNumDatanodes(1)
.setMaxLockedMemory(BLOCK_SIZE)
.setRamDiskReplicaCapacity(BLOCK_SIZE * 2 - 1)
.build();
final String METHOD_NAME = GenericTestUtils.getMethodName();
- final FsDatasetSpi<?> fsd = cluster.getDataNodes().get(0).getFSDataset();
+ final FsDatasetImpl fsd =
+ (FsDatasetImpl) cluster.getDataNodes().get(0).getFSDataset();
- Path path = new Path("/" + METHOD_NAME + ".dat");
- makeTestFile(path, BLOCK_SIZE, true);
+ Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
+ makeTestFile(path1, BLOCK_SIZE, true);
+ assertThat(fsd.getCacheUsed(), is((long) BLOCK_SIZE));
- // The block should get evicted soon since it pushes RAM disk free
- // space below the threshold.
- waitForLockedBytesUsed(fsd, 0);
+ // Wait until the replica is written to persistent storage.
+ waitForMetric("RamDiskBlocksLazyPersisted", 1);
- MetricsRecordBuilder rb =
- MetricsAsserts.getMetrics(cluster.getDataNodes().get(0).getMetrics().name());
- MetricsAsserts.assertCounter("RamDiskBlocksEvicted", 1L, rb);
+ // Trigger eviction and verify locked bytes were released.
+ fsd.evictLazyPersistBlocks(Long.MAX_VALUE);
+ verifyRamDiskJMXMetric("RamDiskBlocksEvicted", 1);
+ waitForLockedBytesUsed(fsd, 0);
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/175c5829/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistReplicaPlacement.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistReplicaPlacement.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistReplicaPlacement.java
index 018eaba..c89475a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistReplicaPlacement.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistReplicaPlacement.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.test.GenericTestUtils;
@@ -28,6 +29,8 @@ import java.io.IOException;
import static org.apache.hadoop.fs.StorageType.DEFAULT;
import static org.apache.hadoop.fs.StorageType.RAM_DISK;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.fail;
public class TestLazyPersistReplicaPlacement extends LazyPersistTestCase {
@@ -70,32 +73,50 @@ public class TestLazyPersistReplicaPlacement extends LazyPersistTestCase {
ensureFileReplicasOnStorageType(path, DEFAULT);
}
+ @Test
+ public void testSynchronousEviction() throws Exception {
+ getClusterBuilder().setMaxLockedMemory(BLOCK_SIZE).build();
+ final String METHOD_NAME = GenericTestUtils.getMethodName();
+
+ final Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
+ makeTestFile(path1, BLOCK_SIZE, true);
+ ensureFileReplicasOnStorageType(path1, RAM_DISK);
+
+ // Wait until the replica is written to persistent storage.
+ waitForMetric("RamDiskBlocksLazyPersisted", 1);
+
+ // Ensure that writing a new file to RAM DISK evicts the block
+ // for the previous one.
+ Path path2 = new Path("/" + METHOD_NAME + ".02.dat");
+ makeTestFile(path2, BLOCK_SIZE, true);
+ verifyRamDiskJMXMetric("RamDiskBlocksEvictedWithoutRead", 1);
+ }
+
/**
* File can not fit in RamDisk even with eviction
* @throws IOException
*/
@Test
public void testFallbackToDiskFull() throws Exception {
- getClusterBuilder().setRamDiskReplicaCapacity(0).build();
+ getClusterBuilder().setMaxLockedMemory(BLOCK_SIZE / 2).build();
final String METHOD_NAME = GenericTestUtils.getMethodName();
Path path = new Path("/" + METHOD_NAME + ".dat");
makeTestFile(path, BLOCK_SIZE, true);
ensureFileReplicasOnStorageType(path, DEFAULT);
-
verifyRamDiskJMXMetric("RamDiskBlocksWriteFallback", 1);
}
/**
* File partially fit in RamDisk after eviction.
* RamDisk can fit 2 blocks. Write a file with 5 blocks.
- * Expect 2 or less blocks are on RamDisk and 3 or more on disk.
+ * Expect 2 blocks are on RamDisk and rest on disk.
* @throws IOException
*/
@Test
public void testFallbackToDiskPartial()
throws IOException, InterruptedException {
- getClusterBuilder().setRamDiskReplicaCapacity(2).build();
+ getClusterBuilder().setMaxLockedMemory(2 * BLOCK_SIZE).build();
final String METHOD_NAME = GenericTestUtils.getMethodName();
Path path = new Path("/" + METHOD_NAME + ".dat");
@@ -122,8 +143,8 @@ public class TestLazyPersistReplicaPlacement extends LazyPersistTestCase {
// Since eviction is asynchronous, depending on the timing of eviction
// wrt writes, we may get 2 or less blocks on RAM disk.
- assert(numBlocksOnRamDisk <= 2);
- assert(numBlocksOnDisk >= 3);
+ assertThat(numBlocksOnRamDisk, is(2));
+ assertThat(numBlocksOnDisk, is(3));
}
/**
@@ -134,7 +155,8 @@ public class TestLazyPersistReplicaPlacement extends LazyPersistTestCase {
*/
@Test
public void testRamDiskNotChosenByDefault() throws IOException {
- getClusterBuilder().build();
+ getClusterBuilder().setStorageTypes(new StorageType[] {RAM_DISK, RAM_DISK})
+ .build();
final String METHOD_NAME = GenericTestUtils.getMethodName();
Path path = new Path("/" + METHOD_NAME + ".dat");
http://git-wip-us.apache.org/repos/asf/hadoop/blob/175c5829/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyWriter.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyWriter.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyWriter.java
index ee8aaf0..6b16066 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyWriter.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyWriter.java
@@ -28,6 +28,7 @@ import org.junit.Test;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.concurrent.TimeoutException;
import static org.apache.hadoop.fs.StorageType.DEFAULT;
import static org.apache.hadoop.fs.StorageType.RAM_DISK;
@@ -38,18 +39,16 @@ import static org.junit.Assert.assertTrue;
public class TestLazyWriter extends LazyPersistTestCase {
@Test
public void testLazyPersistBlocksAreSaved()
- throws IOException, InterruptedException {
+ throws IOException, InterruptedException, TimeoutException {
getClusterBuilder().build();
+ final int NUM_BLOCKS = 10;
final String METHOD_NAME = GenericTestUtils.getMethodName();
Path path = new Path("/" + METHOD_NAME + ".dat");
// Create a test file
- makeTestFile(path, BLOCK_SIZE * 10, true);
+ makeTestFile(path, BLOCK_SIZE * NUM_BLOCKS, true);
LocatedBlocks locatedBlocks = ensureFileReplicasOnStorageType(path, RAM_DISK);
-
- // Sleep for a short time to allow the lazy writer thread to do its job
- Thread.sleep(6 * LAZY_WRITER_INTERVAL_SEC * 1000);
-
+ waitForMetric("RamDiskBlocksLazyPersisted", NUM_BLOCKS);
LOG.info("Verifying copy was saved to lazyPersist/");
// Make sure that there is a saved copy of the replica on persistent
@@ -57,35 +56,22 @@ public class TestLazyWriter extends LazyPersistTestCase {
ensureLazyPersistBlocksAreSaved(locatedBlocks);
}
- /**
- * RamDisk eviction after lazy persist to disk.
- * @throws Exception
- */
@Test
- public void testRamDiskEviction() throws Exception {
- getClusterBuilder().setRamDiskReplicaCapacity(1 + EVICTION_LOW_WATERMARK).build();
+ public void testSynchronousEviction() throws Exception {
+ getClusterBuilder().setMaxLockedMemory(BLOCK_SIZE).build();
final String METHOD_NAME = GenericTestUtils.getMethodName();
- Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
- Path path2 = new Path("/" + METHOD_NAME + ".02.dat");
- final int SEED = 0xFADED;
- makeRandomTestFile(path1, BLOCK_SIZE, true, SEED);
+ final Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
+ makeTestFile(path1, BLOCK_SIZE, true);
ensureFileReplicasOnStorageType(path1, RAM_DISK);
- // Sleep for a short time to allow the lazy writer thread to do its job.
- Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);
- ensureFileReplicasOnStorageType(path1, RAM_DISK);
+ // Wait until the replica is written to persistent storage.
+ waitForMetric("RamDiskBlocksLazyPersisted", 1);
- // Create another file with a replica on RAM_DISK.
+ // Ensure that writing a new file to RAM DISK evicts the block
+ // for the previous one.
+ Path path2 = new Path("/" + METHOD_NAME + ".02.dat");
makeTestFile(path2, BLOCK_SIZE, true);
- Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);
- triggerBlockReport();
-
- // Ensure the first file was evicted to disk, the second is still on
- // RAM_DISK.
- ensureFileReplicasOnStorageType(path2, RAM_DISK);
- ensureFileReplicasOnStorageType(path1, DEFAULT);
-
verifyRamDiskJMXMetric("RamDiskBlocksEvicted", 1);
verifyRamDiskJMXMetric("RamDiskBlocksEvictedWithoutRead", 1);
}
@@ -98,8 +84,8 @@ public class TestLazyWriter extends LazyPersistTestCase {
*/
@Test
public void testRamDiskEvictionBeforePersist()
- throws IOException, InterruptedException {
- getClusterBuilder().setRamDiskReplicaCapacity(1).build();
+ throws Exception {
+ getClusterBuilder().setMaxLockedMemory(BLOCK_SIZE).build();
final String METHOD_NAME = GenericTestUtils.getMethodName();
Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
Path path2 = new Path("/" + METHOD_NAME + ".02.dat");
@@ -116,6 +102,7 @@ public class TestLazyWriter extends LazyPersistTestCase {
// Eviction should not happen for block of the first file that is not
// persisted yet.
+ verifyRamDiskJMXMetric("RamDiskBlocksEvicted", 0);
ensureFileReplicasOnStorageType(path1, RAM_DISK);
ensureFileReplicasOnStorageType(path2, DEFAULT);
@@ -133,7 +120,7 @@ public class TestLazyWriter extends LazyPersistTestCase {
public void testRamDiskEvictionIsLru()
throws Exception {
final int NUM_PATHS = 5;
- getClusterBuilder().setRamDiskReplicaCapacity(NUM_PATHS + EVICTION_LOW_WATERMARK).build();
+ getClusterBuilder().setMaxLockedMemory(NUM_PATHS * BLOCK_SIZE).build();
final String METHOD_NAME = GenericTestUtils.getMethodName();
Path paths[] = new Path[NUM_PATHS * 2];
@@ -145,8 +132,7 @@ public class TestLazyWriter extends LazyPersistTestCase {
makeTestFile(paths[i], BLOCK_SIZE, true);
}
- // Sleep for a short time to allow the lazy writer thread to do its job.
- Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);
+ waitForMetric("RamDiskBlocksLazyPersisted", NUM_PATHS);
for (int i = 0; i < NUM_PATHS; ++i) {
ensureFileReplicasOnStorageType(paths[i], RAM_DISK);
@@ -227,16 +213,13 @@ public class TestLazyWriter extends LazyPersistTestCase {
makeTestFile(path, BLOCK_SIZE, true);
LocatedBlocks locatedBlocks = ensureFileReplicasOnStorageType(path, RAM_DISK);
-
- // Sleep for a short time to allow the lazy writer thread to do its job
- Thread.sleep(6 * LAZY_WRITER_INTERVAL_SEC * 1000);
+ waitForMetric("RamDiskBlocksLazyPersisted", 1);
// Delete after persist
client.delete(path.toString(), false);
Assert.assertFalse(fs.exists(path));
assertThat(verifyDeletedBlocks(locatedBlocks), is(true));
-
verifyRamDiskJMXMetric("RamDiskBlocksLazyPersisted", 1);
verifyRamDiskJMXMetric("RamDiskBytesLazyPersisted", BLOCK_SIZE);
}
@@ -248,7 +231,7 @@ public class TestLazyWriter extends LazyPersistTestCase {
*/
@Test
public void testDfsUsageCreateDelete()
- throws IOException, InterruptedException {
+ throws IOException, InterruptedException, TimeoutException {
getClusterBuilder().setRamDiskReplicaCapacity(4).build();
final String METHOD_NAME = GenericTestUtils.getMethodName();
Path path = new Path("/" + METHOD_NAME + ".dat");
@@ -261,8 +244,7 @@ public class TestLazyWriter extends LazyPersistTestCase {
assertThat(usedAfterCreate, is((long) BLOCK_SIZE));
- // Sleep for a short time to allow the lazy writer thread to do its job
- Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);
+ waitForMetric("RamDiskBlocksLazyPersisted", 1);
long usedAfterPersist = fs.getUsed();
assertThat(usedAfterPersist, is((long) BLOCK_SIZE));
http://git-wip-us.apache.org/repos/asf/hadoop/blob/175c5829/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestScrLazyPersistFiles.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestScrLazyPersistFiles.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestScrLazyPersistFiles.java
index 7c7ba64..2512588 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestScrLazyPersistFiles.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestScrLazyPersistFiles.java
@@ -16,6 +16,7 @@
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
+import com.google.common.base.Preconditions;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.FSDataInputStream;
@@ -26,6 +27,7 @@ import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
+import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.net.unix.DomainSocket;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.NativeCodeLoader;
@@ -39,13 +41,20 @@ import org.junit.rules.ExpectedException;
import java.io.File;
import java.io.IOException;
+import java.util.concurrent.TimeoutException;
import static org.apache.hadoop.fs.StorageType.DEFAULT;
import static org.apache.hadoop.fs.StorageType.RAM_DISK;
import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
+/**
+ * Test Lazy persist behavior with short-circuit reads. These tests
+ * will be run on Linux only with Native IO enabled. The tests fake
+ * RAM_DISK storage using local disk.
+ */
public class TestScrLazyPersistFiles extends LazyPersistTestCase {
@BeforeClass
@@ -58,6 +67,10 @@ public class TestScrLazyPersistFiles extends LazyPersistTestCase {
Assume.assumeThat(NativeCodeLoader.isNativeCodeLoaded() && !Path.WINDOWS,
equalTo(true));
Assume.assumeThat(DomainSocket.getLoadingFailureReason(), equalTo(null));
+
+ final long osPageSize = NativeIO.POSIX.getCacheManipulator().getOperatingSystemPageSize();
+ Preconditions.checkState(BLOCK_SIZE >= osPageSize);
+ Preconditions.checkState(BLOCK_SIZE % osPageSize == 0);
}
@Rule
@@ -69,35 +82,27 @@ public class TestScrLazyPersistFiles extends LazyPersistTestCase {
*/
@Test
public void testRamDiskShortCircuitRead()
- throws IOException, InterruptedException {
- getClusterBuilder().setNumDatanodes(REPL_FACTOR)
- .setStorageTypes(new StorageType[]{RAM_DISK, DEFAULT})
- .setRamDiskStorageLimit(2 * BLOCK_SIZE - 1)
- .setUseScr(true)
- .build();
+ throws IOException, InterruptedException, TimeoutException {
+ getClusterBuilder().setUseScr(true).build();
final String METHOD_NAME = GenericTestUtils.getMethodName();
final int SEED = 0xFADED;
Path path = new Path("/" + METHOD_NAME + ".dat");
+ // Create a file and wait till it is persisted.
makeRandomTestFile(path, BLOCK_SIZE, true, SEED);
ensureFileReplicasOnStorageType(path, RAM_DISK);
+ waitForMetric("RamDiskBlocksLazyPersisted", 1);
- // Sleep for a short time to allow the lazy writer thread to do its job
- Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);
-
- //assertThat(verifyReadRandomFile(path, BLOCK_SIZE, SEED), is(true));
- FSDataInputStream fis = fs.open(path);
+ HdfsDataInputStream fis = (HdfsDataInputStream) fs.open(path);
// Verify SCR read counters
try {
- fis = fs.open(path);
byte[] buf = new byte[BUFFER_LENGTH];
fis.read(0, buf, 0, BUFFER_LENGTH);
- HdfsDataInputStream dfsis = (HdfsDataInputStream) fis;
Assert.assertEquals(BUFFER_LENGTH,
- dfsis.getReadStatistics().getTotalBytesRead());
+ fis.getReadStatistics().getTotalBytesRead());
Assert.assertEquals(BUFFER_LENGTH,
- dfsis.getReadStatistics().getTotalShortCircuitBytesRead());
+ fis.getReadStatistics().getTotalShortCircuitBytesRead());
} finally {
fis.close();
fis = null;
@@ -111,106 +116,77 @@ public class TestScrLazyPersistFiles extends LazyPersistTestCase {
* @throws InterruptedException
*/
@Test
- public void testRamDiskEvictionWithShortCircuitReadHandle()
- throws IOException, InterruptedException {
- // 5 replica + delta, SCR.
- getClusterBuilder().setNumDatanodes(REPL_FACTOR)
- .setStorageTypes(new StorageType[]{RAM_DISK, DEFAULT})
- .setRamDiskStorageLimit(6 * BLOCK_SIZE - 1)
- .setEvictionLowWatermarkReplicas(3)
- .setUseScr(true)
- .build();
-
+ public void tesScrDuringEviction()
+ throws Exception {
+ getClusterBuilder().setUseScr(true).build();
final String METHOD_NAME = GenericTestUtils.getMethodName();
Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
- Path path2 = new Path("/" + METHOD_NAME + ".02.dat");
- final int SEED = 0xFADED;
- makeRandomTestFile(path1, BLOCK_SIZE, true, SEED);
+ // Create a file and wait till it is persisted.
+ makeTestFile(path1, BLOCK_SIZE, true);
ensureFileReplicasOnStorageType(path1, RAM_DISK);
+ waitForMetric("RamDiskBlocksLazyPersisted", 1);
- // Sleep for a short time to allow the lazy writer thread to do its job.
- // However the block replica should not be evicted from RAM_DISK yet.
- Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);
-
- // No eviction should happen as the free ratio is below the threshold
- FSDataInputStream fis = fs.open(path1);
+ HdfsDataInputStream fis = (HdfsDataInputStream) fs.open(path1);
try {
// Keep and open read handle to path1 while creating path2
byte[] buf = new byte[BUFFER_LENGTH];
fis.read(0, buf, 0, BUFFER_LENGTH);
-
- // Create the 2nd file that will trigger RAM_DISK eviction.
- makeTestFile(path2, BLOCK_SIZE * 2, true);
- ensureFileReplicasOnStorageType(path2, RAM_DISK);
+ triggerEviction(cluster.getDataNodes().get(0));
// Ensure path1 is still readable from the open SCR handle.
- fis.read(fis.getPos(), buf, 0, BUFFER_LENGTH);
- HdfsDataInputStream dfsis = (HdfsDataInputStream) fis;
- Assert.assertEquals(2 * BUFFER_LENGTH,
- dfsis.getReadStatistics().getTotalBytesRead());
- Assert.assertEquals(2 * BUFFER_LENGTH,
- dfsis.getReadStatistics().getTotalShortCircuitBytesRead());
+ fis.read(0, buf, 0, BUFFER_LENGTH);
+ assertThat(fis.getReadStatistics().getTotalBytesRead(),
+ is((long) 2 * BUFFER_LENGTH));
+ assertThat(fis.getReadStatistics().getTotalShortCircuitBytesRead(),
+ is((long) 2 * BUFFER_LENGTH));
} finally {
IOUtils.closeQuietly(fis);
}
-
- // After the open handle is closed, path1 should be evicted to DISK.
- triggerBlockReport();
- ensureFileReplicasOnStorageType(path1, DEFAULT);
}
@Test
- public void testShortCircuitReadAfterEviction()
- throws IOException, InterruptedException {
- Assume.assumeThat(DomainSocket.getLoadingFailureReason(), equalTo(null));
- getClusterBuilder().setRamDiskReplicaCapacity(1 + EVICTION_LOW_WATERMARK)
- .setUseScr(true)
+ public void testScrAfterEviction()
+ throws IOException, InterruptedException, TimeoutException {
+ getClusterBuilder().setUseScr(true)
.setUseLegacyBlockReaderLocal(false)
.build();
doShortCircuitReadAfterEvictionTest();
}
@Test
- public void testLegacyShortCircuitReadAfterEviction()
- throws IOException, InterruptedException {
- getClusterBuilder().setRamDiskReplicaCapacity(1 + EVICTION_LOW_WATERMARK)
- .setUseScr(true)
+ public void testLegacyScrAfterEviction()
+ throws IOException, InterruptedException, TimeoutException {
+ getClusterBuilder().setUseScr(true)
.setUseLegacyBlockReaderLocal(true)
.build();
doShortCircuitReadAfterEvictionTest();
+
+ // In the implementation of legacy short-circuit reads, any failure is
+ // trapped silently, reverts back to a remote read, and also disables all
+ // subsequent legacy short-circuit reads in the ClientContext.
+ // Assert that it didn't get disabled.
+ ClientContext clientContext = client.getClientContext();
+ Assert.assertFalse(clientContext.getDisableLegacyBlockReaderLocal());
}
private void doShortCircuitReadAfterEvictionTest() throws IOException,
- InterruptedException {
+ InterruptedException, TimeoutException {
final String METHOD_NAME = GenericTestUtils.getMethodName();
Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
- Path path2 = new Path("/" + METHOD_NAME + ".02.dat");
final int SEED = 0xFADED;
makeRandomTestFile(path1, BLOCK_SIZE, true, SEED);
+ ensureFileReplicasOnStorageType(path1, RAM_DISK);
+ waitForMetric("RamDiskBlocksLazyPersisted", 1);
// Verify short-circuit read from RAM_DISK.
- ensureFileReplicasOnStorageType(path1, RAM_DISK);
File metaFile = cluster.getBlockMetadataFile(0,
DFSTestUtil.getFirstBlock(fs, path1));
assertTrue(metaFile.length() <= BlockMetadataHeader.getHeaderSize());
assertTrue(verifyReadRandomFile(path1, BLOCK_SIZE, SEED));
- // Sleep for a short time to allow the lazy writer thread to do its job.
- Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);
-
- // Verify short-circuit read from RAM_DISK once again.
- ensureFileReplicasOnStorageType(path1, RAM_DISK);
- metaFile = cluster.getBlockMetadataFile(0,
- DFSTestUtil.getFirstBlock(fs, path1));
- assertTrue(metaFile.length() <= BlockMetadataHeader.getHeaderSize());
- assertTrue(verifyReadRandomFile(path1, BLOCK_SIZE, SEED));
-
- // Create another file with a replica on RAM_DISK, which evicts the first.
- makeRandomTestFile(path2, BLOCK_SIZE, true, SEED);
- Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);
- triggerBlockReport();
+ triggerEviction(cluster.getDataNodes().get(0));
// Verify short-circuit read still works from DEFAULT storage. This time,
// we'll have a checksum written during lazy persistence.
@@ -219,54 +195,35 @@ public class TestScrLazyPersistFiles extends LazyPersistTestCase {
DFSTestUtil.getFirstBlock(fs, path1));
assertTrue(metaFile.length() > BlockMetadataHeader.getHeaderSize());
assertTrue(verifyReadRandomFile(path1, BLOCK_SIZE, SEED));
-
- // In the implementation of legacy short-circuit reads, any failure is
- // trapped silently, reverts back to a remote read, and also disables all
- // subsequent legacy short-circuit reads in the ClientContext. If the test
- // uses legacy, then assert that it didn't get disabled.
- ClientContext clientContext = client.getClientContext();
- if (clientContext.getUseLegacyBlockReaderLocal()) {
- Assert.assertFalse(clientContext.getDisableLegacyBlockReaderLocal());
- }
}
@Test
- public void testShortCircuitReadBlockFileCorruption() throws IOException,
- InterruptedException {
- Assume.assumeThat(DomainSocket.getLoadingFailureReason(), equalTo(null));
- getClusterBuilder().setRamDiskReplicaCapacity(1 + EVICTION_LOW_WATERMARK)
- .setUseScr(true)
+ public void testScrBlockFileCorruption() throws IOException,
+ InterruptedException, TimeoutException {
+ getClusterBuilder().setUseScr(true)
.setUseLegacyBlockReaderLocal(false)
.build();
doShortCircuitReadBlockFileCorruptionTest();
}
@Test
- public void testLegacyShortCircuitReadBlockFileCorruption() throws IOException,
- InterruptedException {
- getClusterBuilder().setRamDiskReplicaCapacity(1 + EVICTION_LOW_WATERMARK)
- .setUseScr(true)
+ public void testLegacyScrBlockFileCorruption() throws IOException,
+ InterruptedException, TimeoutException {
+ getClusterBuilder().setUseScr(true)
.setUseLegacyBlockReaderLocal(true)
.build();
doShortCircuitReadBlockFileCorruptionTest();
}
public void doShortCircuitReadBlockFileCorruptionTest() throws IOException,
- InterruptedException {
+ InterruptedException, TimeoutException {
final String METHOD_NAME = GenericTestUtils.getMethodName();
Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
- Path path2 = new Path("/" + METHOD_NAME + ".02.dat");
- final int SEED = 0xFADED;
- makeRandomTestFile(path1, BLOCK_SIZE, true, SEED);
+ makeTestFile(path1, BLOCK_SIZE, true);
ensureFileReplicasOnStorageType(path1, RAM_DISK);
-
- // Create another file with a replica on RAM_DISK, which evicts the first.
- makeRandomTestFile(path2, BLOCK_SIZE, true, SEED);
-
- // Sleep for a short time to allow the lazy writer thread to do its job.
- Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);
- triggerBlockReport();
+ waitForMetric("RamDiskBlocksLazyPersisted", 1);
+ triggerEviction(cluster.getDataNodes().get(0));
// Corrupt the lazy-persisted block file, and verify that checksum
// verification catches it.
@@ -277,42 +234,32 @@ public class TestScrLazyPersistFiles extends LazyPersistTestCase {
}
@Test
- public void testShortCircuitReadMetaFileCorruption() throws IOException,
- InterruptedException {
- Assume.assumeThat(DomainSocket.getLoadingFailureReason(), equalTo(null));
- getClusterBuilder().setRamDiskReplicaCapacity(1 + EVICTION_LOW_WATERMARK)
- .setUseScr(true)
+ public void testScrMetaFileCorruption() throws IOException,
+ InterruptedException, TimeoutException {
+ getClusterBuilder().setUseScr(true)
.setUseLegacyBlockReaderLocal(false)
.build();
doShortCircuitReadMetaFileCorruptionTest();
}
@Test
- public void testLegacyShortCircuitReadMetaFileCorruption() throws IOException,
- InterruptedException {
- getClusterBuilder().setRamDiskReplicaCapacity(1 + EVICTION_LOW_WATERMARK)
- .setUseScr(true)
+ public void testLegacyScrMetaFileCorruption() throws IOException,
+ InterruptedException, TimeoutException {
+ getClusterBuilder().setUseScr(true)
.setUseLegacyBlockReaderLocal(true)
.build();
doShortCircuitReadMetaFileCorruptionTest();
}
public void doShortCircuitReadMetaFileCorruptionTest() throws IOException,
- InterruptedException {
+ InterruptedException, TimeoutException {
final String METHOD_NAME = GenericTestUtils.getMethodName();
Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
- Path path2 = new Path("/" + METHOD_NAME + ".02.dat");
- final int SEED = 0xFADED;
- makeRandomTestFile(path1, BLOCK_SIZE, true, SEED);
+ makeTestFile(path1, BLOCK_SIZE, true);
ensureFileReplicasOnStorageType(path1, RAM_DISK);
-
- // Create another file with a replica on RAM_DISK, which evicts the first.
- makeRandomTestFile(path2, BLOCK_SIZE, true, SEED);
-
- // Sleep for a short time to allow the lazy writer thread to do its job.
- Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);
- triggerBlockReport();
+ waitForMetric("RamDiskBlocksLazyPersisted", 1);
+ triggerEviction(cluster.getDataNodes().get(0));
// Corrupt the lazy-persisted checksum file, and verify that checksum
// verification catches it.