You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2015/05/21 01:48:37 UTC
[13/50] [abbrv] hadoop git commit: HDFS-8157. Writes to RAM DISK
reserve locked memory for block files. (Arpit Agarwal)
HDFS-8157. Writes to RAM DISK reserve locked memory for block files. (Arpit Agarwal)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e453989a
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e453989a
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e453989a
Branch: refs/heads/HDFS-7240
Commit: e453989a5722e653bd97e3e54f9bbdffc9454fba
Parents: b0ad644
Author: Arpit Agarwal <ar...@apache.org>
Authored: Sat May 16 09:05:35 2015 -0700
Committer: Arpit Agarwal <ar...@apache.org>
Committed: Sat May 16 09:05:35 2015 -0700
----------------------------------------------------------------------
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 +
.../hdfs/server/datanode/ReplicaInPipeline.java | 11 +-
.../hdfs/server/datanode/ReplicaInfo.java | 12 +-
.../server/datanode/fsdataset/FsVolumeSpi.java | 8 +
.../datanode/fsdataset/impl/BlockPoolSlice.java | 2 +-
.../impl/FsDatasetAsyncDiskService.java | 7 +-
.../datanode/fsdataset/impl/FsDatasetCache.java | 85 +++++++-
.../datanode/fsdataset/impl/FsDatasetImpl.java | 106 ++++++----
.../datanode/fsdataset/impl/FsVolumeImpl.java | 20 +-
.../impl/RamDiskReplicaLruTracker.java | 19 +-
.../fsdataset/impl/RamDiskReplicaTracker.java | 12 +-
.../org/apache/hadoop/hdfs/MiniDFSCluster.java | 2 +-
.../hdfs/server/balancer/TestBalancer.java | 9 +-
.../server/datanode/SimulatedFSDataset.java | 4 +
.../server/datanode/TestDirectoryScanner.java | 9 +
.../server/datanode/TestFsDatasetCache.java | 4 +-
.../datanode/extdataset/ExternalVolumeImpl.java | 4 +
.../fsdataset/impl/LazyPersistTestCase.java | 57 ++++--
.../impl/TestLazyPersistLockedMemory.java | 201 +++++++++++++++++++
.../fsdataset/impl/TestWriteToReplica.java | 4 +-
20 files changed, 497 insertions(+), 82 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e453989a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 0c4d850..8c823ef 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -560,6 +560,9 @@ Release 2.8.0 - UNRELEASED
HDFS-8394. Move getAdditionalBlock() and related functionalities into a
separate class. (wheat9)
+ HDFS-8157. Writes to RAM DISK reserve locked memory for block files.
+ (Arpit Agarwal)
+
OPTIMIZATIONS
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e453989a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
index cc55f85..0eb143a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
@@ -51,7 +51,8 @@ public class ReplicaInPipeline extends ReplicaInfo
* the bytes already written to this block.
*/
private long bytesReserved;
-
+ private final long originalBytesReserved;
+
/**
* Constructor for a zero length replica
* @param blockId block id
@@ -97,6 +98,7 @@ public class ReplicaInPipeline extends ReplicaInfo
this.bytesOnDisk = len;
this.writer = writer;
this.bytesReserved = bytesToReserve;
+ this.originalBytesReserved = bytesToReserve;
}
/**
@@ -109,6 +111,7 @@ public class ReplicaInPipeline extends ReplicaInfo
this.bytesOnDisk = from.getBytesOnDisk();
this.writer = from.writer;
this.bytesReserved = from.bytesReserved;
+ this.originalBytesReserved = from.originalBytesReserved;
}
@Override
@@ -149,8 +152,14 @@ public class ReplicaInPipeline extends ReplicaInfo
}
@Override
+ public long getOriginalBytesReserved() {
+ return originalBytesReserved;
+ }
+
+ @Override
public void releaseAllBytesReserved() { // ReplicaInPipelineInterface
getVolume().releaseReservedSpace(bytesReserved);
+ getVolume().releaseLockedMemory(bytesReserved);
bytesReserved = 0;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e453989a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java
index 940d3eb..136d8a9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java
@@ -218,7 +218,17 @@ abstract public class ReplicaInfo extends Block implements Replica {
public long getBytesReserved() {
return 0;
}
-
+
+ /**
+ * Number of bytes originally reserved for this replica. The actual
+ * reservation is adjusted as data is written to disk.
+ *
+ * @return the number of bytes originally reserved for this replica.
+ */
+ public long getOriginalBytesReserved() {
+ return 0;
+ }
+
/**
* Copy specified file into a temporary file. Then rename the
* temporary file to the original name. This will cause any
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e453989a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java
index 2a8f31b..8d1bb2a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java
@@ -73,6 +73,14 @@ public interface FsVolumeSpi {
public void releaseReservedSpace(long bytesToRelease);
/**
+ * Release reserved memory for an RBW block written to transient storage
+ * i.e. RAM.
+ * bytesToRelease will be rounded down to the OS page size since locked
+ * memory reservation must always be a multiple of the page size.
+ */
+ public void releaseLockedMemory(long bytesToRelease);
+
+ /**
* BlockIterator will return ExtendedBlock entries from a block pool in
* this volume. The entries will be returned in sorted order.<p/>
*
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e453989a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
index a47d564..951c759 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
@@ -475,7 +475,7 @@ class BlockPoolSlice {
// eventually.
if (newReplica.getVolume().isTransientStorage()) {
lazyWriteReplicaMap.addReplica(bpid, blockId,
- (FsVolumeImpl) newReplica.getVolume());
+ (FsVolumeImpl) newReplica.getVolume(), 0);
} else {
lazyWriteReplicaMap.discardReplica(bpid, blockId, false);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e453989a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java
index c1d3990..fdc9f83 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import java.io.File;
import java.io.FileDescriptor;
-import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
@@ -277,7 +276,8 @@ class FsDatasetAsyncDiskService {
@Override
public void run() {
- long dfsBytes = blockFile.length() + metaFile.length();
+ final long blockLength = blockFile.length();
+ final long metaLength = metaFile.length();
boolean result;
result = (trashDirectory == null) ? deleteFiles() : moveFiles();
@@ -291,7 +291,8 @@ class FsDatasetAsyncDiskService {
if(block.getLocalBlock().getNumBytes() != BlockCommand.NO_ACK){
datanode.notifyNamenodeDeletedBlock(block, volume.getStorageID());
}
- volume.decDfsUsed(block.getBlockPoolId(), dfsBytes);
+ volume.onBlockFileDeletion(block.getBlockPoolId(), blockLength);
+ volume.onMetaFileDeletion(block.getBlockPoolId(), metaLength);
LOG.info("Deleted " + block.getBlockPoolId() + " "
+ block.getLocalBlock() + " file " + blockFile);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e453989a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java
index e0df0f2..6f524b2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java
@@ -151,10 +151,15 @@ public class FsDatasetCache {
/**
* Round up a number to the operating system page size.
*/
- public long round(long count) {
- long newCount =
- (count + (osPageSize - 1)) / osPageSize;
- return newCount * osPageSize;
+ public long roundUp(long count) {
+ return (count + osPageSize - 1) & (~(osPageSize - 1));
+ }
+
+ /**
+ * Round down a number to the operating system page size.
+ */
+ public long roundDown(long count) {
+ return count & (~(osPageSize - 1));
}
}
@@ -173,7 +178,7 @@ public class FsDatasetCache {
* -1 if we failed.
*/
long reserve(long count) {
- count = rounder.round(count);
+ count = rounder.roundUp(count);
while (true) {
long cur = usedBytes.get();
long next = cur + count;
@@ -195,10 +200,23 @@ public class FsDatasetCache {
* @return The new number of usedBytes.
*/
long release(long count) {
- count = rounder.round(count);
+ count = rounder.roundUp(count);
return usedBytes.addAndGet(-count);
}
-
+
+ /**
+ * Release some bytes that we're using rounded down to the page size.
+ *
+ * @param count The number of bytes to release. We will round this
+ * down to the page size.
+ *
+ * @return The new number of usedBytes.
+ */
+ long releaseRoundDown(long count) {
+ count = rounder.roundDown(count);
+ return usedBytes.addAndGet(-count);
+ }
+
long get() {
return usedBytes.get();
}
@@ -341,6 +359,52 @@ public class FsDatasetCache {
}
/**
+ * Try to reserve more bytes.
+ *
+ * @param count The number of bytes to add. We will round this
+ * up to the page size.
+ *
+ * @return The new number of usedBytes if we succeeded;
+ * -1 if we failed.
+ */
+ long reserve(long count) {
+ return usedBytesCount.reserve(count);
+ }
+
+ /**
+ * Release some bytes that we're using.
+ *
+ * @param count The number of bytes to release. We will round this
+ * up to the page size.
+ *
+ * @return The new number of usedBytes.
+ */
+ long release(long count) {
+ return usedBytesCount.release(count);
+ }
+
+ /**
+ * Release some bytes that we're using rounded down to the page size.
+ *
+ * @param count The number of bytes to release. We will round this
+ * down to the page size.
+ *
+ * @return The new number of usedBytes.
+ */
+ long releaseRoundDown(long count) {
+ return usedBytesCount.releaseRoundDown(count);
+ }
+
+ /**
+ * Get the OS page size.
+ *
+ * @return the OS page size.
+ */
+ long getOsPageSize() {
+ return usedBytesCount.rounder.osPageSize;
+ }
+
+ /**
* Background worker that mmaps, mlocks, and checksums a block
*/
private class CachingTask implements Runnable {
@@ -363,7 +427,7 @@ public class FsDatasetCache {
MappableBlock mappableBlock = null;
ExtendedBlock extBlk = new ExtendedBlock(key.getBlockPoolId(),
key.getBlockId(), length, genstamp);
- long newUsedBytes = usedBytesCount.reserve(length);
+ long newUsedBytes = reserve(length);
boolean reservedBytes = false;
try {
if (newUsedBytes < 0) {
@@ -423,7 +487,7 @@ public class FsDatasetCache {
IOUtils.closeQuietly(metaIn);
if (!success) {
if (reservedBytes) {
- usedBytesCount.release(length);
+ release(length);
}
LOG.debug("Caching of {} was aborted. We are now caching only {} "
+ "bytes in total.", key, usedBytesCount.get());
@@ -502,8 +566,7 @@ public class FsDatasetCache {
synchronized (FsDatasetCache.this) {
mappableBlockMap.remove(key);
}
- long newUsedBytes =
- usedBytesCount.release(value.mappableBlock.getLength());
+ long newUsedBytes = release(value.mappableBlock.getLength());
numBlocksCached.addAndGet(-1);
dataset.datanode.getMetrics().incrBlocksUncached(1);
if (revocationTimeMs != 0) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e453989a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index 8725126..8ebd214 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -319,8 +319,18 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
cacheManager = new FsDatasetCache(this);
// Start the lazy writer once we have built the replica maps.
- lazyWriter = new Daemon(new LazyWriter(conf));
- lazyWriter.start();
+ // We need to start the lazy writer even if MaxLockedMemory is set to
+ // zero because we may have un-persisted replicas in memory from before
+ // the process restart. To minimize the chances of data loss we'll
+ // ensure they get written to disk now.
+ if (ramDiskReplicaTracker.numReplicasNotPersisted() > 0 ||
+ datanode.getDnConf().getMaxLockedMemory() > 0) {
+ lazyWriter = new Daemon(new LazyWriter(conf));
+ lazyWriter.start();
+ } else {
+ lazyWriter = null;
+ }
+
registerMBean(datanode.getDatanodeUuid());
// Add a Metrics2 Source Interface. This is same
@@ -1284,26 +1294,33 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
" and thus cannot be created.");
}
// create a new block
- FsVolumeReference ref;
- while (true) {
+ FsVolumeReference ref = null;
+
+ // Use ramdisk only if block size is a multiple of OS page size.
+ // This simplifies reservation for partially used replicas
+ // significantly.
+ if (allowLazyPersist &&
+ lazyWriter != null &&
+ b.getNumBytes() % cacheManager.getOsPageSize() == 0 &&
+ (cacheManager.reserve(b.getNumBytes())) > 0) {
try {
- if (allowLazyPersist) {
- // First try to place the block on a transient volume.
- ref = volumes.getNextTransientVolume(b.getNumBytes());
- datanode.getMetrics().incrRamDiskBlocksWrite();
- } else {
- ref = volumes.getNextVolume(storageType, b.getNumBytes());
- }
- } catch (DiskOutOfSpaceException de) {
- if (allowLazyPersist) {
- datanode.getMetrics().incrRamDiskBlocksWriteFallback();
- allowLazyPersist = false;
- continue;
+ // First try to place the block on a transient volume.
+ ref = volumes.getNextTransientVolume(b.getNumBytes());
+ datanode.getMetrics().incrRamDiskBlocksWrite();
+ } catch(DiskOutOfSpaceException de) {
+ // Ignore the exception since we just fall back to persistent storage.
+ datanode.getMetrics().incrRamDiskBlocksWriteFallback();
+ } finally {
+ if (ref == null) {
+ cacheManager.release(b.getNumBytes());
}
- throw de;
}
- break;
}
+
+ if (ref == null) {
+ ref = volumes.getNextVolume(storageType, b.getNumBytes());
+ }
+
FsVolumeImpl v = (FsVolumeImpl) ref.getVolume();
// create an rbw file to hold block in the designated volume
File f;
@@ -1564,7 +1581,11 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
newReplicaInfo = new FinalizedReplica(replicaInfo, v, dest.getParentFile());
if (v.isTransientStorage()) {
- ramDiskReplicaTracker.addReplica(bpid, replicaInfo.getBlockId(), v);
+ releaseLockedMemory(
+ replicaInfo.getOriginalBytesReserved() - replicaInfo.getNumBytes(),
+ false);
+ ramDiskReplicaTracker.addReplica(
+ bpid, replicaInfo.getBlockId(), v, replicaInfo.getNumBytes());
datanode.getMetrics().addRamDiskBytesWrite(replicaInfo.getNumBytes());
}
}
@@ -1811,9 +1832,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
}
/**
- * We're informed that a block is no longer valid. We
- * could lazily garbage-collect the block, but why bother?
- * just get rid of it.
+ * We're informed that a block is no longer valid. Delete it.
*/
@Override // FsDatasetSpi
public void invalidate(String bpid, Block invalidBlks[]) throws IOException {
@@ -2064,8 +2083,10 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
public void shutdown() {
fsRunning = false;
- ((LazyWriter) lazyWriter.getRunnable()).stop();
- lazyWriter.interrupt();
+ if (lazyWriter != null) {
+ ((LazyWriter) lazyWriter.getRunnable()).stop();
+ lazyWriter.interrupt();
+ }
if (mbeanName != null) {
MBeans.unregister(mbeanName);
@@ -2083,11 +2104,13 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
volumes.shutdown();
}
- try {
- lazyWriter.join();
- } catch (InterruptedException ie) {
- LOG.warn("FsDatasetImpl.shutdown ignoring InterruptedException " +
- "from LazyWriter.join");
+ if (lazyWriter != null) {
+ try {
+ lazyWriter.join();
+ } catch (InterruptedException ie) {
+ LOG.warn("FsDatasetImpl.shutdown ignoring InterruptedException " +
+ "from LazyWriter.join");
+ }
}
}
@@ -2173,7 +2196,11 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
diskFile.length(), diskGS, vol, diskFile.getParentFile());
volumeMap.add(bpid, diskBlockInfo);
if (vol.isTransientStorage()) {
- ramDiskReplicaTracker.addReplica(bpid, blockId, (FsVolumeImpl) vol);
+ long lockedBytesReserved =
+ cacheManager.reserve(diskBlockInfo.getNumBytes()) > 0 ?
+ diskBlockInfo.getNumBytes() : 0;
+ ramDiskReplicaTracker.addReplica(
+ bpid, blockId, (FsVolumeImpl) vol, lockedBytesReserved);
}
LOG.warn("Added missing block to memory " + diskBlockInfo);
return;
@@ -2760,12 +2787,14 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
boolean ramDiskConfigured = ramDiskConfigured();
// Add thread for DISK volume if RamDisk is configured
if (ramDiskConfigured &&
+ asyncLazyPersistService != null &&
!asyncLazyPersistService.queryVolume(v.getCurrentDir())) {
asyncLazyPersistService.addVolume(v.getCurrentDir());
}
// Remove thread for DISK volume if RamDisk is not configured
if (!ramDiskConfigured &&
+ asyncLazyPersistService != null &&
asyncLazyPersistService.queryVolume(v.getCurrentDir())) {
asyncLazyPersistService.removeVolume(v.getCurrentDir());
}
@@ -2790,9 +2819,10 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
// Remove the old replicas
if (blockFile.delete() || !blockFile.exists()) {
- ((FsVolumeImpl) replicaInfo.getVolume()).decDfsUsed(bpid, blockFileUsed);
+ FsVolumeImpl volume = (FsVolumeImpl) replicaInfo.getVolume();
+ volume.onBlockFileDeletion(bpid, blockFileUsed);
if (metaFile.delete() || !metaFile.exists()) {
- ((FsVolumeImpl) replicaInfo.getVolume()).decDfsUsed(bpid, metaFileUsed);
+ volume.onMetaFileDeletion(bpid, metaFileUsed);
}
}
@@ -2905,8 +2935,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
}
/**
- * Attempt to evict one or more transient block replicas we have at least
- * spaceNeeded bytes free.
+ * Attempt to evict one or more transient block replicas until we
+ * have at least spaceNeeded bytes free.
*/
private void evictBlocks() throws IOException {
int iterations = 0;
@@ -3056,5 +3086,13 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
s.add(blockId);
}
}
+
+ void releaseLockedMemory(long count, boolean roundup) {
+ if (roundup) {
+ cacheManager.release(count);
+ } else {
+ cacheManager.releaseRoundDown(count);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e453989a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
index bc96a02..49a56bb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
@@ -274,7 +274,18 @@ public class FsVolumeImpl implements FsVolumeSpi {
return getBlockPoolSlice(bpid).getTmpDir();
}
- void decDfsUsed(String bpid, long value) {
+ void onBlockFileDeletion(String bpid, long value) {
+ decDfsUsed(bpid, value);
+ if (isTransientStorage()) {
+ dataset.releaseLockedMemory(value, true);
+ }
+ }
+
+ void onMetaFileDeletion(String bpid, long value) {
+ decDfsUsed(bpid, value);
+ }
+
+ private void decDfsUsed(String bpid, long value) {
synchronized(dataset) {
BlockPoolSlice bp = bpSlices.get(bpid);
if (bp != null) {
@@ -428,6 +439,13 @@ public class FsVolumeImpl implements FsVolumeSpi {
}
}
+ @Override
+ public void releaseLockedMemory(long bytesToRelease) {
+ if (isTransientStorage()) {
+ dataset.releaseLockedMemory(bytesToRelease, false);
+ }
+ }
+
private enum SubdirFilter implements FilenameFilter {
INSTANCE;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e453989a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskReplicaLruTracker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskReplicaLruTracker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskReplicaLruTracker.java
index c01a6cf..b940736 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskReplicaLruTracker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskReplicaLruTracker.java
@@ -38,8 +38,10 @@ public class RamDiskReplicaLruTracker extends RamDiskReplicaTracker {
private class RamDiskReplicaLru extends RamDiskReplica {
long lastUsedTime;
- private RamDiskReplicaLru(String bpid, long blockId, FsVolumeImpl ramDiskVolume) {
- super(bpid, blockId, ramDiskVolume);
+ private RamDiskReplicaLru(String bpid, long blockId,
+ FsVolumeImpl ramDiskVolume,
+ long lockedBytesReserved) {
+ super(bpid, blockId, ramDiskVolume, lockedBytesReserved);
}
@Override
@@ -70,20 +72,23 @@ public class RamDiskReplicaLruTracker extends RamDiskReplicaTracker {
TreeMultimap<Long, RamDiskReplicaLru> replicasPersisted;
RamDiskReplicaLruTracker() {
- replicaMaps = new HashMap<String, Map<Long, RamDiskReplicaLru>>();
- replicasNotPersisted = new LinkedList<RamDiskReplicaLru>();
+ replicaMaps = new HashMap<>();
+ replicasNotPersisted = new LinkedList<>();
replicasPersisted = TreeMultimap.create();
}
@Override
synchronized void addReplica(final String bpid, final long blockId,
- final FsVolumeImpl transientVolume) {
+ final FsVolumeImpl transientVolume,
+ long lockedBytesReserved) {
Map<Long, RamDiskReplicaLru> map = replicaMaps.get(bpid);
if (map == null) {
- map = new HashMap<Long, RamDiskReplicaLru>();
+ map = new HashMap<>();
replicaMaps.put(bpid, map);
}
- RamDiskReplicaLru ramDiskReplicaLru = new RamDiskReplicaLru(bpid, blockId, transientVolume);
+ RamDiskReplicaLru ramDiskReplicaLru =
+ new RamDiskReplicaLru(bpid, blockId, transientVolume,
+ lockedBytesReserved);
map.put(blockId, ramDiskReplicaLru);
replicasNotPersisted.add(ramDiskReplicaLru);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e453989a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskReplicaTracker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskReplicaTracker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskReplicaTracker.java
index 7507925..335ed70 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskReplicaTracker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskReplicaTracker.java
@@ -45,6 +45,7 @@ public abstract class RamDiskReplicaTracker {
private final long blockId;
private File savedBlockFile;
private File savedMetaFile;
+ private long lockedBytesReserved;
private long creationTime;
protected AtomicLong numReads = new AtomicLong(0);
@@ -61,10 +62,12 @@ public abstract class RamDiskReplicaTracker {
FsVolumeImpl lazyPersistVolume;
RamDiskReplica(final String bpid, final long blockId,
- final FsVolumeImpl ramDiskVolume) {
+ final FsVolumeImpl ramDiskVolume,
+ long lockedBytesReserved) {
this.bpid = bpid;
this.blockId = blockId;
this.ramDiskVolume = ramDiskVolume;
+ this.lockedBytesReserved = lockedBytesReserved;
lazyPersistVolume = null;
savedMetaFile = null;
savedBlockFile = null;
@@ -168,6 +171,10 @@ public abstract class RamDiskReplicaTracker {
public String toString() {
return "[BlockPoolID=" + bpid + "; BlockId=" + blockId + "]";
}
+
+ public long getLockedBytesReserved() {
+ return lockedBytesReserved;
+ }
}
/**
@@ -201,7 +208,8 @@ public abstract class RamDiskReplicaTracker {
* @param transientVolume RAM disk volume that stores the replica.
*/
abstract void addReplica(final String bpid, final long blockId,
- final FsVolumeImpl transientVolume);
+ final FsVolumeImpl transientVolume,
+ long lockedBytesReserved);
/**
* Invoked when a replica is opened by a client. This may be used as
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e453989a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
index 12ad23e..fdbacdc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
@@ -1582,7 +1582,7 @@ public class MiniDFSCluster {
throw new IllegalStateException("Attempting to finalize "
+ "Namenode but it is not running");
}
- ToolRunner.run(new DFSAdmin(conf), new String[] {"-finalizeUpgrade"});
+ ToolRunner.run(new DFSAdmin(conf), new String[]{"-finalizeUpgrade"});
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e453989a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
index e756f0b..92d31d0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
@@ -66,6 +66,7 @@ import org.apache.hadoop.hdfs.server.balancer.Balancer.Parameters;
import org.apache.hadoop.hdfs.server.balancer.Balancer.Result;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.LazyPersistTestCase;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Time;
@@ -120,13 +121,16 @@ public class TestBalancer {
conf.setLong(DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY, 2000L);
}
- static void initConfWithRamDisk(Configuration conf) {
+ static void initConfWithRamDisk(Configuration conf,
+ long ramDiskCapacity) {
conf.setLong(DFS_BLOCK_SIZE_KEY, DEFAULT_RAM_DISK_BLOCK_SIZE);
+ conf.setLong(DFS_DATANODE_MAX_LOCKED_MEMORY_KEY, ramDiskCapacity);
conf.setInt(DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC, 3);
conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, 1);
conf.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 500);
conf.setInt(DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC, 1);
conf.setInt(DFS_DATANODE_RAM_DISK_LOW_WATERMARK_BYTES, DEFAULT_RAM_DISK_BLOCK_SIZE);
+ LazyPersistTestCase.initCacheManipulator();
}
/* create a file with a length of <code>fileLen</code> */
@@ -1245,7 +1249,6 @@ public class TestBalancer {
final int SEED = 0xFADED;
final short REPL_FACT = 1;
Configuration conf = new Configuration();
- initConfWithRamDisk(conf);
final int defaultRamDiskCapacity = 10;
final long ramDiskStorageLimit =
@@ -1255,6 +1258,8 @@ public class TestBalancer {
((long) defaultRamDiskCapacity * DEFAULT_RAM_DISK_BLOCK_SIZE) +
(DEFAULT_RAM_DISK_BLOCK_SIZE - 1);
+ initConfWithRamDisk(conf, ramDiskStorageLimit);
+
cluster = new MiniDFSCluster
.Builder(conf)
.numDataNodes(1)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e453989a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
index 2ac9416..778dd28 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
@@ -492,6 +492,10 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
}
@Override
+ public void releaseLockedMemory(long bytesToRelease) {
+ }
+
+ @Override
public void releaseReservedSpace(long bytesToRelease) {
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e453989a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
index b225e35..9b942b7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
@@ -53,6 +53,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.LazyPersistTestCase;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Test;
@@ -79,6 +80,8 @@ public class TestDirectoryScanner {
CONF.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_LENGTH);
CONF.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, 1);
CONF.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L);
+ CONF.setLong(DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY,
+ Long.MAX_VALUE);
}
/** create a file with a length of <code>fileLen</code> */
@@ -308,6 +311,7 @@ public class TestDirectoryScanner {
@Test (timeout=300000)
public void testRetainBlockOnPersistentStorage() throws Exception {
+ LazyPersistTestCase.initCacheManipulator();
cluster = new MiniDFSCluster
.Builder(CONF)
.storageTypes(new StorageType[] { StorageType.RAM_DISK, StorageType.DEFAULT })
@@ -349,6 +353,7 @@ public class TestDirectoryScanner {
@Test (timeout=300000)
public void testDeleteBlockOnTransientStorage() throws Exception {
+ LazyPersistTestCase.initCacheManipulator();
cluster = new MiniDFSCluster
.Builder(CONF)
.storageTypes(new StorageType[] { StorageType.RAM_DISK, StorageType.DEFAULT })
@@ -615,6 +620,10 @@ public class TestDirectoryScanner {
}
@Override
+ public void releaseLockedMemory(long bytesToRelease) {
+ }
+
+ @Override
public BlockIterator newBlockIterator(String bpid, String name) {
throw new UnsupportedOperationException();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e453989a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java
index 7a09630..58932fb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java
@@ -339,7 +339,7 @@ public class TestFsDatasetCache {
for (int i=0; i<numFiles-1; i++) {
setHeartbeatResponse(cacheBlocks(fileLocs[i]));
total = DFSTestUtil.verifyExpectedCacheUsage(
- rounder.round(total + fileSizes[i]), 4 * (i + 1), fsd);
+ rounder.roundUp(total + fileSizes[i]), 4 * (i + 1), fsd);
}
// nth file should hit a capacity exception
@@ -365,7 +365,7 @@ public class TestFsDatasetCache {
int curCachedBlocks = 16;
for (int i=0; i<numFiles-1; i++) {
setHeartbeatResponse(uncacheBlocks(fileLocs[i]));
- long uncachedBytes = rounder.round(fileSizes[i]);
+ long uncachedBytes = rounder.roundUp(fileSizes[i]);
total -= uncachedBytes;
curCachedBlocks -= uncachedBytes / BLOCK_SIZE;
DFSTestUtil.verifyExpectedCacheUsage(total, curCachedBlocks, fsd);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e453989a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalVolumeImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalVolumeImpl.java
index ea9e4c1..3242ff7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalVolumeImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalVolumeImpl.java
@@ -82,6 +82,10 @@ public class ExternalVolumeImpl implements FsVolumeSpi {
}
@Override
+ public void releaseLockedMemory(long bytesToRelease) {
+ }
+
+ @Override
public BlockIterator newBlockIterator(String bpid, String name) {
return null;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e453989a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyPersistTestCase.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyPersistTestCase.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyPersistTestCase.java
index 5dc86f7..5ce5cc6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyPersistTestCase.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyPersistTestCase.java
@@ -23,16 +23,7 @@ import static org.apache.hadoop.fs.CreateFlag.CREATE;
import static org.apache.hadoop.fs.CreateFlag.LAZY_PERSIST;
import static org.apache.hadoop.fs.StorageType.DEFAULT;
import static org.apache.hadoop.fs.StorageType.RAM_DISK;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CONTEXT;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADERLOCAL;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_RAM_DISK_LOW_WATERMARK_BYTES;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
@@ -40,6 +31,7 @@ import static org.junit.Assert.fail;
import java.io.File;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.HashSet;
@@ -68,6 +60,7 @@ import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.tools.JMXGet;
+import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.net.unix.TemporarySocketDirectory;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.test.GenericTestUtils;
@@ -80,8 +73,8 @@ public abstract class LazyPersistTestCase {
static final byte LAZY_PERSIST_POLICY_ID = (byte) 15;
static {
- DFSTestUtil.setNameNodeLogLevel(Level.ALL);
- GenericTestUtils.setLogLevel(FsDatasetImpl.LOG, Level.ALL);
+ DFSTestUtil.setNameNodeLogLevel(Level.DEBUG);
+ GenericTestUtils.setLogLevel(FsDatasetImpl.LOG, Level.DEBUG);
}
protected static final int BLOCK_SIZE = 5 * 1024 * 1024;
@@ -95,6 +88,8 @@ public abstract class LazyPersistTestCase {
protected static final int LAZY_WRITER_INTERVAL_SEC = 1;
protected static final Log LOG = LogFactory.getLog(LazyPersistTestCase.class);
protected static final short REPL_FACTOR = 1;
+ protected final long osPageSize =
+ NativeIO.POSIX.getCacheManipulator().getOperatingSystemPageSize();
protected MiniDFSCluster cluster;
protected DistributedFileSystem fs;
@@ -194,7 +189,7 @@ public abstract class LazyPersistTestCase {
protected final void makeRandomTestFile(Path path, long length,
boolean isLazyPersist, long seed) throws IOException {
DFSTestUtil.createFile(fs, path, isLazyPersist, BUFFER_LENGTH, length,
- BLOCK_SIZE, REPL_FACTOR, seed, true);
+ BLOCK_SIZE, REPL_FACTOR, seed, true);
}
protected final void makeTestFile(Path path, long length,
@@ -242,10 +237,12 @@ public abstract class LazyPersistTestCase {
int ramDiskReplicaCapacity,
long ramDiskStorageLimit,
long evictionLowWatermarkReplicas,
+ long maxLockedMemory,
boolean useSCR,
boolean useLegacyBlockReaderLocal,
boolean disableScrubber) throws IOException {
+ initCacheManipulator();
Configuration conf = new Configuration();
conf.setLong(DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
if (disableScrubber) {
@@ -262,6 +259,7 @@ public abstract class LazyPersistTestCase {
conf.setLong(DFS_DATANODE_RAM_DISK_LOW_WATERMARK_BYTES,
evictionLowWatermarkReplicas * BLOCK_SIZE);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_SAFEMODE_MIN_DATANODES_KEY, 1);
+ conf.setLong(DFS_DATANODE_MAX_LOCKED_MEMORY_KEY, maxLockedMemory);
if (useSCR) {
conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, true);
@@ -311,6 +309,31 @@ public abstract class LazyPersistTestCase {
LOG.info("Cluster startup complete");
}
+ /**
+ * Use a dummy cache manipulator for testing.
+ */
+ public static void initCacheManipulator() {
+ NativeIO.POSIX.setCacheManipulator(new NativeIO.POSIX.CacheManipulator() {
+ @Override
+ public void mlock(String identifier,
+ ByteBuffer mmap, long length) throws IOException {
+ LOG.info("LazyPersistTestCase: faking mlock of " + identifier + " bytes.");
+ }
+
+ @Override
+ public long getMemlockLimit() {
+ LOG.info("LazyPersistTestCase: fake return " + Long.MAX_VALUE);
+ return Long.MAX_VALUE;
+ }
+
+ @Override
+ public boolean verifyCanMlock() {
+ LOG.info("LazyPersistTestCase: fake return " + true);
+ return true;
+ }
+ });
+ }
+
ClusterWithRamDiskBuilder getClusterBuilder() {
return new ClusterWithRamDiskBuilder();
}
@@ -344,6 +367,11 @@ public abstract class LazyPersistTestCase {
return this;
}
+ public ClusterWithRamDiskBuilder setMaxLockedMemory(long maxLockedMemory) {
+ this.maxLockedMemory = maxLockedMemory;
+ return this;
+ }
+
public ClusterWithRamDiskBuilder setUseScr(boolean useScr) {
this.useScr = useScr;
return this;
@@ -376,13 +404,14 @@ public abstract class LazyPersistTestCase {
LazyPersistTestCase.this.startUpCluster(
numDatanodes, hasTransientStorage, storageTypes, ramDiskReplicaCapacity,
ramDiskStorageLimit, evictionLowWatermarkReplicas,
- useScr, useLegacyBlockReaderLocal,disableScrubber);
+ maxLockedMemory, useScr, useLegacyBlockReaderLocal, disableScrubber);
}
private int numDatanodes = REPL_FACTOR;
private StorageType[] storageTypes = null;
private int ramDiskReplicaCapacity = -1;
private long ramDiskStorageLimit = -1;
+ private long maxLockedMemory = Long.MAX_VALUE;
private boolean hasTransientStorage = true;
private boolean useScr = false;
private boolean useLegacyBlockReaderLocal = false;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e453989a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistLockedMemory.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistLockedMemory.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistLockedMemory.java
new file mode 100644
index 0000000..9ea4665
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistLockedMemory.java
@@ -0,0 +1,201 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
+
+
+import com.google.common.base.Supplier;
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DFSOutputStream;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
+import org.apache.hadoop.metrics2.MetricsRecordBuilder;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.test.MetricsAsserts;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.EnumSet;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.hadoop.fs.CreateFlag.CREATE;
+import static org.apache.hadoop.fs.CreateFlag.LAZY_PERSIST;
+import static org.apache.hadoop.fs.StorageType.DEFAULT;
+import static org.apache.hadoop.fs.StorageType.RAM_DISK;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Verify that locked memory is used correctly when writing to replicas in
+ * memory
+ */
+public class TestLazyPersistLockedMemory extends LazyPersistTestCase {
+
+ /**
+ * RAM disk present but locked memory is set to zero. Placement should
+ * fall back to disk.
+ */
+ @Test
+ public void testWithNoLockedMemory() throws IOException {
+ getClusterBuilder().setNumDatanodes(1)
+ .setMaxLockedMemory(0).build();
+
+ final String METHOD_NAME = GenericTestUtils.getMethodName();
+ Path path = new Path("/" + METHOD_NAME + ".dat");
+ makeTestFile(path, BLOCK_SIZE, true);
+ ensureFileReplicasOnStorageType(path, DEFAULT);
+ }
+
+ @Test
+ public void testReservation()
+ throws IOException, TimeoutException, InterruptedException {
+ getClusterBuilder().setNumDatanodes(1)
+ .setMaxLockedMemory(BLOCK_SIZE).build();
+ final String METHOD_NAME = GenericTestUtils.getMethodName();
+ final FsDatasetSpi<?> fsd = cluster.getDataNodes().get(0).getFSDataset();
+
+ // Create a file and ensure the replica in RAM_DISK uses locked memory.
+ Path path = new Path("/" + METHOD_NAME + ".dat");
+ makeTestFile(path, BLOCK_SIZE, true);
+ ensureFileReplicasOnStorageType(path, RAM_DISK);
+ assertThat(fsd.getCacheUsed(), is((long) BLOCK_SIZE));
+ }
+
+ @Test
+ public void testReleaseOnFileDeletion()
+ throws IOException, TimeoutException, InterruptedException {
+ getClusterBuilder().setNumDatanodes(1)
+ .setMaxLockedMemory(BLOCK_SIZE).build();
+ final String METHOD_NAME = GenericTestUtils.getMethodName();
+ final FsDatasetSpi<?> fsd = cluster.getDataNodes().get(0).getFSDataset();
+
+ Path path = new Path("/" + METHOD_NAME + ".dat");
+ makeTestFile(path, BLOCK_SIZE, true);
+ ensureFileReplicasOnStorageType(path, RAM_DISK);
+ assertThat(fsd.getCacheUsed(), is((long) BLOCK_SIZE));
+
+ // Delete the file and ensure that the locked memory is released.
+ fs.delete(path, false);
+ DataNodeTestUtils.triggerBlockReport(cluster.getDataNodes().get(0));
+ waitForLockedBytesUsed(fsd, 0);
+ }
+
+ /**
+ * Verify that locked RAM is released when blocks are evicted from RAM disk.
+ */
+ @Test
+ public void testReleaseOnEviction()
+ throws IOException, TimeoutException, InterruptedException {
+ getClusterBuilder().setNumDatanodes(1)
+ .setMaxLockedMemory(BLOCK_SIZE)
+ .setRamDiskReplicaCapacity(BLOCK_SIZE * 2 - 1)
+ .build();
+ final String METHOD_NAME = GenericTestUtils.getMethodName();
+ final FsDatasetSpi<?> fsd = cluster.getDataNodes().get(0).getFSDataset();
+
+ Path path = new Path("/" + METHOD_NAME + ".dat");
+ makeTestFile(path, BLOCK_SIZE, true);
+
+ // The block should get evicted soon since it pushes RAM disk free
+ // space below the threshold.
+ waitForLockedBytesUsed(fsd, 0);
+
+ MetricsRecordBuilder rb =
+ MetricsAsserts.getMetrics(cluster.getDataNodes().get(0).getMetrics().name());
+ MetricsAsserts.assertCounter("RamDiskBlocksEvicted", 1L, rb);
+ }
+
+ /**
+ * Verify that locked bytes are correctly updated when a block is finalized
+ * at less than its max length.
+ */
+ @Test
+ public void testShortBlockFinalized()
+ throws IOException, TimeoutException, InterruptedException {
+ getClusterBuilder().setNumDatanodes(1).build();
+ final String METHOD_NAME = GenericTestUtils.getMethodName();
+ final FsDatasetSpi<?> fsd = cluster.getDataNodes().get(0).getFSDataset();
+
+ Path path = new Path("/" + METHOD_NAME + ".dat");
+ makeTestFile(path, 1, true);
+ assertThat(fsd.getCacheUsed(), is(osPageSize));
+
+ // Delete the file and ensure locked RAM usage goes to zero.
+ fs.delete(path, false);
+ waitForLockedBytesUsed(fsd, 0);
+ }
+
+ /**
+ * Verify that locked bytes are correctly updated when the client goes
+ * away unexpectedly during a write.
+ */
+ @Test
+ public void testWritePipelineFailure()
+ throws IOException, TimeoutException, InterruptedException {
+ getClusterBuilder().setNumDatanodes(1).build();
+ final String METHOD_NAME = GenericTestUtils.getMethodName();
+ final FsDatasetSpi<?> fsd = cluster.getDataNodes().get(0).getFSDataset();
+
+ Path path = new Path("/" + METHOD_NAME + ".dat");
+
+ EnumSet<CreateFlag> createFlags = EnumSet.of(CREATE, LAZY_PERSIST);
+ // Write 1 byte to the file and kill the writer.
+ final FSDataOutputStream fos =
+ fs.create(path,
+ FsPermission.getFileDefault(),
+ createFlags,
+ BUFFER_LENGTH,
+ REPL_FACTOR,
+ BLOCK_SIZE,
+ null);
+
+ fos.write(new byte[1]);
+ fos.hsync();
+ DFSTestUtil.abortStream((DFSOutputStream) fos.getWrappedStream());
+ waitForLockedBytesUsed(fsd, osPageSize);
+
+ // Delete the file and ensure locked RAM goes to zero.
+ fs.delete(path, false);
+ DataNodeTestUtils.triggerBlockReport(cluster.getDataNodes().get(0));
+ waitForLockedBytesUsed(fsd, 0);
+ }
+
+ /**
+ * Wait until used locked byte count goes to the expected value.
+ * @throws TimeoutException after 300 seconds.
+ */
+ private void waitForLockedBytesUsed(final FsDatasetSpi<?> fsd,
+ final long expectedLockedBytes)
+ throws TimeoutException, InterruptedException {
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
+ @Override
+ public Boolean get() {
+ long cacheUsed = fsd.getCacheUsed();
+ LOG.info("cacheUsed=" + cacheUsed + ", waiting for it to be " + expectedLockedBytes);
+ if (cacheUsed < 0) {
+ throw new IllegalStateException("cacheUsed unpexpectedly negative");
+ }
+ return (cacheUsed == expectedLockedBytes);
+ }
+ }, 1000, 300000);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e453989a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java
index d5664cf..a77184b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java
@@ -204,7 +204,7 @@ public class TestWriteToReplica {
long available = v.getCapacity()-v.getDfsUsed();
long expectedLen = blocks[FINALIZED].getNumBytes();
try {
- v.decDfsUsed(bpid, -available);
+ v.onBlockFileDeletion(bpid, -available);
blocks[FINALIZED].setNumBytes(expectedLen+100);
dataSet.append(blocks[FINALIZED], newGS, expectedLen);
Assert.fail("Should not have space to append to an RWR replica" + blocks[RWR]);
@@ -212,7 +212,7 @@ public class TestWriteToReplica {
Assert.assertTrue(e.getMessage().startsWith(
"Insufficient space for appending to "));
}
- v.decDfsUsed(bpid, available);
+ v.onBlockFileDeletion(bpid, available);
blocks[FINALIZED].setNumBytes(expectedLen);
newGS = blocks[RBW].getGenerationStamp()+1;