You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ar...@apache.org on 2014/10/01 18:06:09 UTC
[18/50] [abbrv] git commit: HDFS-7100. Make eviction scheme
pluggable. (Arpit Agarwal)
HDFS-7100. Make eviction scheme pluggable. (Arpit Agarwal)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b2d5ed36
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b2d5ed36
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b2d5ed36
Branch: refs/heads/trunk
Commit: b2d5ed36bcb80e2581191dcdc3976e825c959142
Parents: 09dab88
Author: arp <ar...@apache.org>
Authored: Sat Sep 20 13:25:23 2014 -0700
Committer: arp <ar...@apache.org>
Committed: Sat Sep 20 13:25:23 2014 -0700
----------------------------------------------------------------------
.../hadoop-hdfs/CHANGES-HDFS-6581.txt | 2 +
.../org/apache/hadoop/hdfs/DFSConfigKeys.java | 3 +
.../hadoop/hdfs/server/datanode/DataNode.java | 3 +-
.../datanode/fsdataset/impl/BlockPoolSlice.java | 11 +-
.../datanode/fsdataset/impl/FsDatasetImpl.java | 97 ++++---
.../datanode/fsdataset/impl/FsVolumeImpl.java | 14 +-
.../datanode/fsdataset/impl/FsVolumeList.java | 4 +-
.../fsdataset/impl/LazyWriteReplicaTracker.java | 268 -------------------
.../impl/RamDiskReplicaLruTracker.java | 208 ++++++++++++++
.../fsdataset/impl/RamDiskReplicaTracker.java | 245 +++++++++++++++++
.../fsdataset/impl/FsDatasetTestUtil.java | 2 +-
.../fsdataset/impl/TestLazyPersistFiles.java | 47 ++--
12 files changed, 558 insertions(+), 346 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2d5ed36/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-6581.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-6581.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-6581.txt
index a2f4b30..c7045c2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-6581.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-6581.txt
@@ -60,4 +60,6 @@
HDFS-7091. Add forwarding constructor for INodeFile for existing callers.
(Arpit Agarwal)
+ HDFS-7100. Make eviction scheme pluggable. (Arpit Agarwal)
+
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2d5ed36/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index ff7055f..ce635ef 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hdfs;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.RamDiskReplicaLruTracker;
import org.apache.hadoop.hdfs.web.AuthFilter;
import org.apache.hadoop.http.HttpConfig;
@@ -129,6 +130,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final int DFS_DATANODE_FSDATASETCACHE_MAX_THREADS_PER_VOLUME_DEFAULT = 4;
public static final String DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC = "dfs.datanode.lazywriter.interval.sec";
public static final int DFS_DATANODE_LAZY_WRITER_INTERVAL_DEFAULT_SEC = 60;
+ public static final String DFS_DATANODE_RAM_DISK_REPLICA_TRACKER_KEY = "dfs.datanode.ram.disk.replica.tracker";
+ public static final Class<RamDiskReplicaLruTracker> DFS_DATANODE_RAM_DISK_REPLICA_TRACKER_DEFAULT = RamDiskReplicaLruTracker.class;
public static final String DFS_DATANODE_RAM_DISK_LOW_WATERMARK_PERCENT = "dfs.datanode.ram.disk.low.watermark.percent";
public static final int DFS_DATANODE_RAM_DISK_LOW_WATERMARK_PERCENT_DEFAULT = 10;
public static final String DFS_DATANODE_RAM_DISK_LOW_WATERMARK_REPLICAS = "dfs.datanode.ram.disk.low.watermark.replicas";
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2d5ed36/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index 35fc821..b4fca82 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -2041,7 +2041,8 @@ public class DataNode extends ReconfigurableBase
LOG.warn("Cannot find BPOfferService for reporting block received for bpid="
+ block.getBlockPoolId());
}
- if (blockScanner != null) {
+ FsVolumeSpi volume = getFSDataset().getVolume(block);
+ if (blockScanner != null && !volume.isTransientStorage()) {
blockScanner.addBlock(block);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2d5ed36/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
index a4bcc3e..2ee16f6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
@@ -286,9 +286,9 @@ class BlockPoolSlice {
* Move a persisted replica from lazypersist directory to a subdirectory
* under finalized.
*/
- File activateSavedReplica(Block b, File blockFile) throws IOException {
+ File activateSavedReplica(Block b, File metaFile, File blockFile)
+ throws IOException {
final File blockDir = DatanodeUtil.idToBlockDir(finalizedDir, b.getBlockId());
- final File metaFile = FsDatasetUtil.getMetaFile(blockFile, b.getGenerationStamp());
final File targetBlockFile = new File(blockDir, blockFile.getName());
final File targetMetaFile = new File(blockDir, metaFile.getName());
FileUtils.moveFile(blockFile, targetBlockFile);
@@ -307,7 +307,7 @@ class BlockPoolSlice {
void getVolumeMap(ReplicaMap volumeMap,
- final LazyWriteReplicaTracker lazyWriteReplicaMap)
+ final RamDiskReplicaTracker lazyWriteReplicaMap)
throws IOException {
// Recover lazy persist replicas, they will be added to the volumeMap
// when we scan the finalized directory.
@@ -404,7 +404,7 @@ class BlockPoolSlice {
* false if the directory has rbw replicas
*/
void addToReplicasMap(ReplicaMap volumeMap, File dir,
- final LazyWriteReplicaTracker lazyWriteReplicaMap,
+ final RamDiskReplicaTracker lazyWriteReplicaMap,
boolean isFinalized)
throws IOException {
File files[] = FileUtil.listFiles(dir);
@@ -481,7 +481,8 @@ class BlockPoolSlice {
// it is in the lazyWriteReplicaMap so it can be persisted
// eventually.
if (newReplica.getVolume().isTransientStorage()) {
- lazyWriteReplicaMap.addReplica(bpid, blockId, newReplica.getVolume());
+ lazyWriteReplicaMap.addReplica(bpid, blockId,
+ (FsVolumeImpl) newReplica.getVolume());
} else {
lazyWriteReplicaMap.discardReplica(bpid, blockId, false);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2d5ed36/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index a29f5e6..be72266 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -30,7 +30,6 @@ import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -87,6 +86,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.RollingLogs;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.RoundRobinVolumeChoosingPolicy;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeChoosingPolicy;
+import static org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.RamDiskReplicaTracker.RamDiskReplica;
import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
@@ -158,7 +158,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
@Override // FsDatasetSpi
public synchronized Block getStoredBlock(String bpid, long blkid)
throws IOException {
- File blockfile = getFile(bpid, blkid);
+ File blockfile = getFile(bpid, blkid, false);
if (blockfile == null) {
return null;
}
@@ -218,7 +218,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
private volatile boolean fsRunning;
final ReplicaMap volumeMap;
- final LazyWriteReplicaTracker lazyWriteReplicaTracker;
+ final RamDiskReplicaTracker ramDiskReplicaTracker;
private static final int MAX_BLOCK_EVICTIONS_PER_ITERATION = 3;
@@ -262,7 +262,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
storageMap = new ConcurrentHashMap<String, DatanodeStorage>();
volumeMap = new ReplicaMap(this);
- lazyWriteReplicaTracker = new LazyWriteReplicaTracker(this);
+ ramDiskReplicaTracker = RamDiskReplicaTracker.getInstance(conf, this);
@SuppressWarnings("unchecked")
final VolumeChoosingPolicy<FsVolumeImpl> blockChooserImpl =
@@ -297,7 +297,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
FsVolumeImpl fsVolume = FsVolumeImplAllocator.createVolume(
this, sd.getStorageUuid(), dir, this.conf, storageType);
ReplicaMap tempVolumeMap = new ReplicaMap(this);
- fsVolume.getVolumeMap(tempVolumeMap, lazyWriteReplicaTracker);
+ fsVolume.getVolumeMap(tempVolumeMap, ramDiskReplicaTracker);
volumeMap.addAll(tempVolumeMap);
volumes.addVolume(fsVolume);
@@ -325,7 +325,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
for (final String bpid : bpids) {
try {
fsVolume.addBlockPool(bpid, this.conf);
- fsVolume.getVolumeMap(bpid, tempVolumeMap, lazyWriteReplicaTracker);
+ fsVolume.getVolumeMap(bpid, tempVolumeMap, ramDiskReplicaTracker);
} catch (IOException e) {
LOG.warn("Caught exception when adding " + fsVolume +
". Will throw later.", e);
@@ -585,12 +585,16 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
* checking that it exists. This should be used when the
* next operation is going to open the file for read anyway,
* and thus the exists check is redundant.
+ *
+ * @param touch if true then update the last access timestamp of the
+ * block. Currently used for blocks on transient storage.
*/
- private File getBlockFileNoExistsCheck(ExtendedBlock b)
+ private File getBlockFileNoExistsCheck(ExtendedBlock b,
+ boolean touch)
throws IOException {
final File f;
synchronized(this) {
- f = getFile(b.getBlockPoolId(), b.getLocalBlock().getBlockId());
+ f = getFile(b.getBlockPoolId(), b.getLocalBlock().getBlockId(), touch);
}
if (f == null) {
throw new IOException("Block " + b + " is not valid");
@@ -601,7 +605,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
@Override // FsDatasetSpi
public InputStream getBlockInputStream(ExtendedBlock b,
long seekOffset) throws IOException {
- File blockFile = getBlockFileNoExistsCheck(b);
+ File blockFile = getBlockFileNoExistsCheck(b, true);
if (isNativeIOAvailable) {
return NativeIO.getShareDeleteFileInputStream(blockFile, seekOffset);
} else {
@@ -1239,7 +1243,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
newReplicaInfo = new FinalizedReplica(replicaInfo, v, dest.getParentFile());
if (v.isTransientStorage()) {
- lazyWriteReplicaTracker.addReplica(bpid, replicaInfo.getBlockId(), v);
+ ramDiskReplicaTracker.addReplica(bpid, replicaInfo.getBlockId(), v);
}
}
volumeMap.add(bpid, newReplicaInfo);
@@ -1264,7 +1268,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
LOG.warn("Block " + b + " unfinalized and removed. " );
}
if (replicaInfo.getVolume().isTransientStorage()) {
- lazyWriteReplicaTracker.discardReplica(b.getBlockPoolId(), b.getBlockId(), true);
+ ramDiskReplicaTracker.discardReplica(b.getBlockPoolId(), b.getBlockId(), true);
}
}
}
@@ -1410,7 +1414,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
//Should we check for metadata file too?
final File f;
synchronized(this) {
- f = getFile(bpid, blockId);
+ f = getFile(bpid, blockId, false);
}
if(f != null ) {
@@ -1495,7 +1499,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
}
if (v.isTransientStorage()) {
- lazyWriteReplicaTracker.discardReplica(bpid, invalidBlks[i].getBlockId(), true);
+ ramDiskReplicaTracker.discardReplica(bpid, invalidBlks[i].getBlockId(), true);
}
// If a DFSClient has the replica in its cache of short-circuit file
@@ -1627,7 +1631,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
@Override // FsDatasetSpi
public synchronized boolean contains(final ExtendedBlock block) {
final long blockId = block.getLocalBlock().getBlockId();
- return getFile(block.getBlockPoolId(), blockId) != null;
+ return getFile(block.getBlockPoolId(), blockId, false) != null;
}
/**
@@ -1636,9 +1640,12 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
* @param blockId a block's id
* @return on disk data file path; null if the replica does not exist
*/
- File getFile(final String bpid, final long blockId) {
+ File getFile(final String bpid, final long blockId, boolean touch) {
ReplicaInfo info = volumeMap.get(bpid, blockId);
if (info != null) {
+ if (touch && info.getVolume().isTransientStorage()) {
+ ramDiskReplicaTracker.touch(bpid, blockId);
+ }
return info.getBlockFile();
}
return null;
@@ -1807,7 +1814,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
blockScanner.deleteBlock(bpid, new Block(blockId));
}
if (vol.isTransientStorage()) {
- lazyWriteReplicaTracker.discardReplica(bpid, blockId, true);
+ ramDiskReplicaTracker.discardReplica(bpid, blockId, true);
}
LOG.warn("Removed block " + blockId
+ " from memory with missing block file on the disk");
@@ -1829,11 +1836,12 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
diskFile.length(), diskGS, vol, diskFile.getParentFile());
volumeMap.add(bpid, diskBlockInfo);
final DataBlockScanner blockScanner = datanode.getBlockScanner();
- if (blockScanner != null) {
- blockScanner.addBlock(new ExtendedBlock(bpid, diskBlockInfo));
- }
- if (vol.isTransientStorage()) {
- lazyWriteReplicaTracker.addReplica(bpid, blockId, (FsVolumeImpl) vol);
+ if (!vol.isTransientStorage()) {
+ if (blockScanner != null) {
+ blockScanner.addBlock(new ExtendedBlock(bpid, diskBlockInfo));
+ }
+ } else {
+ ramDiskReplicaTracker.addReplica(bpid, blockId, (FsVolumeImpl) vol);
}
LOG.warn("Added missing block to memory " + diskBlockInfo);
return;
@@ -2116,7 +2124,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
volumes.addBlockPool(bpid, conf);
volumeMap.initBlockPool(bpid);
}
- volumes.getAllVolumesMap(bpid, volumeMap, lazyWriteReplicaTracker);
+ volumes.getAllVolumesMap(bpid, volumeMap, ramDiskReplicaTracker);
}
@Override
@@ -2346,7 +2354,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
LOG.debug("LazyWriter starting to save blockId=" + blockId + "; bpid=" + bpid);
}
- lazyWriteReplicaTracker.recordStartLazyPersist(bpid, blockId, targetVolume);
+ ramDiskReplicaTracker.recordStartLazyPersist(bpid, blockId, targetVolume);
bpSlice = targetVolume.getBlockPoolSlice(bpid);
srcMeta = replicaInfo.getMetaFile();
srcFile = replicaInfo.getBlockFile();
@@ -2358,7 +2366,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
bpSlice.lazyPersistReplica(blockId, genStamp, srcMeta, srcFile);
synchronized (FsDatasetImpl.this) {
- lazyWriteReplicaTracker.recordEndLazyPersist(bpid, blockId, savedFiles);
+ ramDiskReplicaTracker.recordEndLazyPersist(bpid, blockId, savedFiles);
if (LOG.isDebugEnabled()) {
LOG.debug("LazyWriter finished saving blockId=" + blockId + "; bpid=" + bpid +
@@ -2373,21 +2381,21 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
* @return true if there is more work to be done, false otherwise.
*/
private boolean saveNextReplica() {
- LazyWriteReplicaTracker.ReplicaState replicaState = null;
+ RamDiskReplica block = null;
boolean succeeded = false;
try {
- replicaState = lazyWriteReplicaTracker.dequeueNextReplicaToPersist();
- if (replicaState != null) {
- moveReplicaToNewVolume(replicaState.bpid, replicaState.blockId);
+ block = ramDiskReplicaTracker.dequeueNextReplicaToPersist();
+ if (block != null) {
+ moveReplicaToNewVolume(block.getBlockPoolId(), block.getBlockId());
}
succeeded = true;
} catch(IOException ioe) {
- LOG.warn("Exception saving replica " + replicaState, ioe);
+ LOG.warn("Exception saving replica " + block, ioe);
} finally {
- if (!succeeded && replicaState != null) {
- LOG.warn("Failed to save replica " + replicaState + ". re-enqueueing it.");
- lazyWriteReplicaTracker.reenqueueReplicaNotPersisted(replicaState);
+ if (!succeeded && block != null) {
+ LOG.warn("Failed to save replica " + block + ". re-enqueueing it.");
+ ramDiskReplicaTracker.reenqueueReplicaNotPersisted(block);
}
}
@@ -2425,8 +2433,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
while (iterations++ < MAX_BLOCK_EVICTIONS_PER_ITERATION &&
transientFreeSpaceBelowThreshold()) {
- LazyWriteReplicaTracker.ReplicaState replicaState =
- lazyWriteReplicaTracker.getNextCandidateForEviction();
+ RamDiskReplica replicaState = ramDiskReplicaTracker.getNextCandidateForEviction();
if (replicaState == null) {
break;
@@ -2439,46 +2446,48 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
ReplicaInfo replicaInfo, newReplicaInfo;
File blockFile, metaFile;
long blockFileUsed, metaFileUsed;
+ final String bpid = replicaState.getBlockPoolId();
synchronized (FsDatasetImpl.this) {
- replicaInfo = getReplicaInfo(replicaState.bpid, replicaState.blockId);
+ replicaInfo = getReplicaInfo(replicaState.getBlockPoolId(), replicaState.getBlockId());
Preconditions.checkState(replicaInfo.getVolume().isTransientStorage());
blockFile = replicaInfo.getBlockFile();
metaFile = replicaInfo.getMetaFile();
blockFileUsed = blockFile.length();
metaFileUsed = metaFile.length();
- lazyWriteReplicaTracker.discardReplica(replicaState, false);
+ ramDiskReplicaTracker.discardReplica(replicaState, false);
// Move the replica from lazyPersist/ to finalized/ on target volume
BlockPoolSlice bpSlice =
- replicaState.lazyPersistVolume.getBlockPoolSlice(replicaState.bpid);
+ replicaState.getLazyPersistVolume().getBlockPoolSlice(bpid);
File newBlockFile = bpSlice.activateSavedReplica(
- replicaInfo, replicaState.savedBlockFile);
+ replicaInfo, replicaState.getSavedMetaFile(),
+ replicaState.getSavedBlockFile());
newReplicaInfo =
new FinalizedReplica(replicaInfo.getBlockId(),
replicaInfo.getBytesOnDisk(),
replicaInfo.getGenerationStamp(),
- replicaState.lazyPersistVolume,
+ replicaState.getLazyPersistVolume(),
newBlockFile.getParentFile());
// Update the volumeMap entry.
- volumeMap.add(replicaState.bpid, newReplicaInfo);
+ volumeMap.add(bpid, newReplicaInfo);
}
// Before deleting the files from transient storage we must notify the
// NN that the files are on the new storage. Else a blockReport from
// the transient storage might cause the NN to think the blocks are lost.
ExtendedBlock extendedBlock =
- new ExtendedBlock(replicaState.bpid, newReplicaInfo);
+ new ExtendedBlock(bpid, newReplicaInfo);
datanode.notifyNamenodeReceivedBlock(
extendedBlock, null, newReplicaInfo.getStorageUuid());
// Remove the old replicas from transient storage.
if (blockFile.delete() || !blockFile.exists()) {
- ((FsVolumeImpl) replicaInfo.getVolume()).decDfsUsed(replicaState.bpid, blockFileUsed);
+ ((FsVolumeImpl) replicaInfo.getVolume()).decDfsUsed(bpid, blockFileUsed);
if (metaFile.delete() || !metaFile.exists()) {
- ((FsVolumeImpl) replicaInfo.getVolume()).decDfsUsed(replicaState.bpid, metaFileUsed);
+ ((FsVolumeImpl) replicaInfo.getVolume()).decDfsUsed(bpid, metaFileUsed);
}
}
@@ -2499,7 +2508,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
// Sleep if we have no more work to do or if it looks like we are not
// making any forward progress. This is to ensure that if all persist
// operations are failing we don't keep retrying them in a tight loop.
- if (numSuccessiveFailures >= lazyWriteReplicaTracker.numReplicasNotPersisted()) {
+ if (numSuccessiveFailures >= ramDiskReplicaTracker.numReplicasNotPersisted()) {
Thread.sleep(checkpointerInterval * 1000);
numSuccessiveFailures = 0;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2d5ed36/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
index 149ca27..3eed38c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
@@ -235,9 +235,6 @@ public class FsVolumeImpl implements FsVolumeSpi {
@Override
public void reserveSpaceForRbw(long bytesToReserve) {
if (bytesToReserve != 0) {
- if (FsDatasetImpl.LOG.isDebugEnabled()) {
- FsDatasetImpl.LOG.debug("Reserving " + bytesToReserve + " on volume " + getBasePath());
- }
reservedForRbw.addAndGet(bytesToReserve);
}
}
@@ -245,9 +242,6 @@ public class FsVolumeImpl implements FsVolumeSpi {
@Override
public void releaseReservedSpace(long bytesToRelease) {
if (bytesToRelease != 0) {
- if (FsDatasetImpl.LOG.isDebugEnabled()) {
- FsDatasetImpl.LOG.debug("Releasing " + bytesToRelease + " on volume " + getBasePath());
- }
long oldReservation, newReservation;
do {
@@ -298,17 +292,17 @@ public class FsVolumeImpl implements FsVolumeSpi {
}
void getVolumeMap(ReplicaMap volumeMap,
- final LazyWriteReplicaTracker lazyWriteReplicaMap)
+ final RamDiskReplicaTracker ramDiskReplicaMap)
throws IOException {
for(BlockPoolSlice s : bpSlices.values()) {
- s.getVolumeMap(volumeMap, lazyWriteReplicaMap);
+ s.getVolumeMap(volumeMap, ramDiskReplicaMap);
}
}
void getVolumeMap(String bpid, ReplicaMap volumeMap,
- final LazyWriteReplicaTracker lazyWriteReplicaMap)
+ final RamDiskReplicaTracker ramDiskReplicaMap)
throws IOException {
- getBlockPoolSlice(bpid).getVolumeMap(volumeMap, lazyWriteReplicaMap);
+ getBlockPoolSlice(bpid).getVolumeMap(volumeMap, ramDiskReplicaMap);
}
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2d5ed36/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
index 82fe35f..837ddf7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
@@ -121,7 +121,7 @@ class FsVolumeList {
void getAllVolumesMap(final String bpid,
final ReplicaMap volumeMap,
- final LazyWriteReplicaTracker lazyWriteReplicaMap)
+ final RamDiskReplicaTracker ramDiskReplicaMap)
throws IOException {
long totalStartTime = Time.monotonicNow();
final List<IOException> exceptions = Collections.synchronizedList(
@@ -134,7 +134,7 @@ class FsVolumeList {
FsDatasetImpl.LOG.info("Adding replicas to map for block pool " +
bpid + " on volume " + v + "...");
long startTime = Time.monotonicNow();
- v.getVolumeMap(bpid, volumeMap, lazyWriteReplicaMap);
+ v.getVolumeMap(bpid, volumeMap, ramDiskReplicaMap);
long timeTaken = Time.monotonicNow() - startTime;
FsDatasetImpl.LOG.info("Time to add replicas to map for block pool"
+ " " + bpid + " on volume " + v + ": " + timeTaken + "ms");
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2d5ed36/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyWriteReplicaTracker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyWriteReplicaTracker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyWriteReplicaTracker.java
deleted file mode 100644
index e8d9c5c..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyWriteReplicaTracker.java
+++ /dev/null
@@ -1,268 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
-
-
-import com.google.common.collect.TreeMultimap;
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
-
-import java.io.File;
-import java.util.*;
-
-class LazyWriteReplicaTracker {
-
- enum State {
- IN_MEMORY,
- LAZY_PERSIST_IN_PROGRESS,
- LAZY_PERSIST_COMPLETE,
- }
-
- static class ReplicaState implements Comparable<ReplicaState> {
-
- final String bpid;
- final long blockId;
- State state;
-
- /**
- * transient storage volume that holds the original replica.
- */
- final FsVolumeSpi transientVolume;
-
- /**
- * Persistent volume that holds or will hold the saved replica.
- */
- FsVolumeImpl lazyPersistVolume;
- File savedMetaFile;
- File savedBlockFile;
-
- ReplicaState(final String bpid, final long blockId, FsVolumeSpi transientVolume) {
- this.bpid = bpid;
- this.blockId = blockId;
- this.transientVolume = transientVolume;
- state = State.IN_MEMORY;
- lazyPersistVolume = null;
- savedMetaFile = null;
- savedBlockFile = null;
- }
-
- void deleteSavedFiles() {
- try {
- if (savedBlockFile != null) {
- savedBlockFile.delete();
- savedBlockFile = null;
- }
-
- if (savedMetaFile != null) {
- savedMetaFile.delete();
- savedMetaFile = null;
- }
- } catch (Throwable t) {
- // Ignore any exceptions.
- }
- }
-
- @Override
- public String toString() {
- return "[Bpid=" + bpid + ";blockId=" + blockId + "]";
- }
-
- @Override
- public int hashCode() {
- return bpid.hashCode() ^ (int) blockId;
- }
-
- @Override
- public boolean equals(Object other) {
- if (this == other) {
- return true;
- }
-
- if (other == null || getClass() != other.getClass()) {
- return false;
- }
-
- ReplicaState otherState = (ReplicaState) other;
- return (otherState.bpid.equals(bpid) && otherState.blockId == blockId);
- }
-
- @Override
- public int compareTo(ReplicaState other) {
- if (blockId == other.blockId) {
- return 0;
- } else if (blockId < other.blockId) {
- return -1;
- } else {
- return 1;
- }
- }
- }
-
- final FsDatasetImpl fsDataset;
-
- /**
- * Map of blockpool ID to map of blockID to ReplicaInfo.
- */
- final Map<String, Map<Long, ReplicaState>> replicaMaps;
-
- /**
- * Queue of replicas that need to be written to disk.
- * Stale entries are GC'd by dequeueNextReplicaToPersist.
- */
- final Queue<ReplicaState> replicasNotPersisted;
-
- /**
- * Queue of replicas in the order in which they were persisted.
- * We'll dequeue them in the same order.
- * We can improve the eviction scheme later.
- * Stale entries are GC'd by getNextCandidateForEviction.
- */
- final Queue<ReplicaState> replicasPersisted;
-
- LazyWriteReplicaTracker(final FsDatasetImpl fsDataset) {
- this.fsDataset = fsDataset;
- replicaMaps = new HashMap<String, Map<Long, ReplicaState>>();
- replicasNotPersisted = new LinkedList<ReplicaState>();
- replicasPersisted = new LinkedList<ReplicaState>();
- }
-
- synchronized void addReplica(String bpid, long blockId,
- final FsVolumeSpi transientVolume) {
- Map<Long, ReplicaState> map = replicaMaps.get(bpid);
- if (map == null) {
- map = new HashMap<Long, ReplicaState>();
- replicaMaps.put(bpid, map);
- }
- ReplicaState replicaState = new ReplicaState(bpid, blockId, transientVolume);
- map.put(blockId, replicaState);
- replicasNotPersisted.add(replicaState);
- }
-
- synchronized void recordStartLazyPersist(
- final String bpid, final long blockId, FsVolumeImpl checkpointVolume) {
- Map<Long, ReplicaState> map = replicaMaps.get(bpid);
- ReplicaState replicaState = map.get(blockId);
- replicaState.state = State.LAZY_PERSIST_IN_PROGRESS;
- replicaState.lazyPersistVolume = checkpointVolume;
- }
-
- /**
- * @param bpid
- * @param blockId
- * @param savedFiles The saved meta and block files, in that order.
- */
- synchronized void recordEndLazyPersist(
- final String bpid, final long blockId, final File[] savedFiles) {
- Map<Long, ReplicaState> map = replicaMaps.get(bpid);
- ReplicaState replicaState = map.get(blockId);
-
- if (replicaState == null) {
- throw new IllegalStateException("Unknown replica bpid=" +
- bpid + "; blockId=" + blockId);
- }
- replicaState.state = State.LAZY_PERSIST_COMPLETE;
- replicaState.savedMetaFile = savedFiles[0];
- replicaState.savedBlockFile = savedFiles[1];
-
- if (replicasNotPersisted.peek() == replicaState) {
- // Common case.
- replicasNotPersisted.remove();
- } else {
- // Should never occur in practice as lazy writer always persists
- // the replica at the head of the queue before moving to the next
- // one.
- replicasNotPersisted.remove(replicaState);
- }
-
- replicasPersisted.add(replicaState);
- }
-
- synchronized ReplicaState dequeueNextReplicaToPersist() {
- while (replicasNotPersisted.size() != 0) {
- ReplicaState replicaState = replicasNotPersisted.remove();
- Map<Long, ReplicaState> replicaMap = replicaMaps.get(replicaState.bpid);
-
- if (replicaMap != null && replicaMap.get(replicaState.blockId) != null) {
- return replicaState;
- }
-
- // The replica no longer exists, look for the next one.
- }
- return null;
- }
-
- synchronized void reenqueueReplicaNotPersisted(final ReplicaState replicaState) {
- replicasNotPersisted.add(replicaState);
- }
-
- synchronized void reenqueueReplicaPersisted(final ReplicaState replicaState) {
- replicasPersisted.add(replicaState);
- }
-
- synchronized int numReplicasNotPersisted() {
- return replicasNotPersisted.size();
- }
-
- synchronized ReplicaState getNextCandidateForEviction() {
- while (replicasPersisted.size() != 0) {
- ReplicaState replicaState = replicasPersisted.remove();
- Map<Long, ReplicaState> replicaMap = replicaMaps.get(replicaState.bpid);
-
- if (replicaMap != null && replicaMap.get(replicaState.blockId) != null) {
- return replicaState;
- }
-
- // The replica no longer exists, look for the next one.
- }
- return null;
- }
-
- void discardReplica(ReplicaState replicaState, boolean deleteSavedCopies) {
- discardReplica(replicaState.bpid, replicaState.blockId, deleteSavedCopies);
- }
-
- /**
- * Discard any state we are tracking for the given replica. This could mean
- * the block is either deleted from the block space or the replica is no longer
- * on transient storage.
- *
- * @param deleteSavedCopies true if we should delete the saved copies on
- * persistent storage. This should be set by the
- * caller when the block is no longer needed.
- */
- synchronized void discardReplica(
- final String bpid, final long blockId,
- boolean deleteSavedCopies) {
- Map<Long, ReplicaState> map = replicaMaps.get(bpid);
-
- if (map == null) {
- return;
- }
-
- ReplicaState replicaState = map.get(blockId);
-
- if (replicaState == null) {
- return;
- }
-
- if (deleteSavedCopies) {
- replicaState.deleteSavedFiles();
- }
- map.remove(blockId);
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2d5ed36/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskReplicaLruTracker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskReplicaLruTracker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskReplicaLruTracker.java
new file mode 100644
index 0000000..0899e70
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskReplicaLruTracker.java
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
+
+
+import com.google.common.collect.TreeMultimap;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import java.io.File;
+import java.util.*;
+
+/**
+ * An implementation of RamDiskReplicaTracker that uses an LRU
+ * eviction scheme.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class RamDiskReplicaLruTracker extends RamDiskReplicaTracker {
+
+ private class RamDiskReplicaLru extends RamDiskReplica {
+ long lastUsedTime;
+
+ private RamDiskReplicaLru(String bpid, long blockId, FsVolumeImpl ramDiskVolume) {
+ super(bpid, blockId, ramDiskVolume);
+ }
+ }
+
+ /**
+ * Map of blockpool ID to <map of blockID to ReplicaInfo>.
+ */
+ Map<String, Map<Long, RamDiskReplicaLru>> replicaMaps;
+
+ /**
+ * Queue of replicas that need to be written to disk.
+ * Stale entries are GC'd by dequeueNextReplicaToPersist.
+ */
+ Queue<RamDiskReplicaLru> replicasNotPersisted;
+
+ /**
+ * Map of persisted replicas ordered by their last use times.
+ */
+ TreeMultimap<Long, RamDiskReplicaLru> replicasPersisted;
+
+ RamDiskReplicaLruTracker() {
+ replicaMaps = new HashMap<String, Map<Long, RamDiskReplicaLru>>();
+ replicasNotPersisted = new LinkedList<RamDiskReplicaLru>();
+ replicasPersisted = TreeMultimap.create();
+ }
+
+ @Override
+ synchronized void addReplica(final String bpid, final long blockId,
+ final FsVolumeImpl transientVolume) {
+ Map<Long, RamDiskReplicaLru> map = replicaMaps.get(bpid);
+ if (map == null) {
+ map = new HashMap<Long, RamDiskReplicaLru>();
+ replicaMaps.put(bpid, map);
+ }
+ RamDiskReplicaLru ramDiskReplicaLru = new RamDiskReplicaLru(bpid, blockId, transientVolume);
+ map.put(blockId, ramDiskReplicaLru);
+ replicasNotPersisted.add(ramDiskReplicaLru);
+ }
+
+ @Override
+ synchronized void touch(final String bpid,
+ final long blockId) {
+ Map<Long, RamDiskReplicaLru> map = replicaMaps.get(bpid);
+ RamDiskReplicaLru ramDiskReplicaLru = map.get(blockId);
+
+ if (ramDiskReplicaLru == null) {
+ return;
+ }
+
+ // Reinsert the replica with its new timestamp.
+ if (replicasPersisted.remove(ramDiskReplicaLru.lastUsedTime, ramDiskReplicaLru)) {
+ ramDiskReplicaLru.lastUsedTime = System.currentTimeMillis();
+ replicasPersisted.put(ramDiskReplicaLru.lastUsedTime, ramDiskReplicaLru);
+ }
+ }
+
+ @Override
+ synchronized void recordStartLazyPersist(
+ final String bpid, final long blockId, FsVolumeImpl checkpointVolume) {
+ Map<Long, RamDiskReplicaLru> map = replicaMaps.get(bpid);
+ RamDiskReplicaLru ramDiskReplicaLru = map.get(blockId);
+ ramDiskReplicaLru.setLazyPersistVolume(checkpointVolume);
+ }
+
+ @Override
+ synchronized void recordEndLazyPersist(
+ final String bpid, final long blockId, final File[] savedFiles) {
+ Map<Long, RamDiskReplicaLru> map = replicaMaps.get(bpid);
+ RamDiskReplicaLru ramDiskReplicaLru = map.get(blockId);
+
+ if (ramDiskReplicaLru == null) {
+ throw new IllegalStateException("Unknown replica bpid=" +
+ bpid + "; blockId=" + blockId);
+ }
+ ramDiskReplicaLru.recordSavedBlockFiles(savedFiles);
+
+ if (replicasNotPersisted.peek() == ramDiskReplicaLru) {
+ // Common case.
+ replicasNotPersisted.remove();
+ } else {
+ // Caller error? Fallback to O(n) removal.
+ replicasNotPersisted.remove(ramDiskReplicaLru);
+ }
+
+ ramDiskReplicaLru.lastUsedTime = System.currentTimeMillis();
+ replicasPersisted.put(ramDiskReplicaLru.lastUsedTime, ramDiskReplicaLru);
+ }
+
+ @Override
+ synchronized RamDiskReplicaLru dequeueNextReplicaToPersist() {
+ while (replicasNotPersisted.size() != 0) {
+ RamDiskReplicaLru ramDiskReplicaLru = replicasNotPersisted.remove();
+ Map<Long, RamDiskReplicaLru> replicaMap =
+ replicaMaps.get(ramDiskReplicaLru.getBlockPoolId());
+
+ if (replicaMap != null && replicaMap.get(ramDiskReplicaLru.getBlockId()) != null) {
+ return ramDiskReplicaLru;
+ }
+
+ // The replica no longer exists, look for the next one.
+ }
+ return null;
+ }
+
+ @Override
+ synchronized void reenqueueReplicaNotPersisted(final RamDiskReplica ramDiskReplicaLru) {
+ replicasNotPersisted.add((RamDiskReplicaLru) ramDiskReplicaLru);
+ }
+
+ @Override
+ synchronized int numReplicasNotPersisted() {
+ return replicasNotPersisted.size();
+ }
+
+ @Override
+ synchronized RamDiskReplicaLru getNextCandidateForEviction() {
+ Iterator it = replicasPersisted.values().iterator();
+ while (it.hasNext()) {
+ RamDiskReplicaLru ramDiskReplicaLru = (RamDiskReplicaLru) it.next();
+ it.remove();
+
+ Map<Long, RamDiskReplicaLru> replicaMap =
+ replicaMaps.get(ramDiskReplicaLru.getBlockPoolId());
+
+ if (replicaMap != null && replicaMap.get(ramDiskReplicaLru.getBlockId()) != null) {
+ return ramDiskReplicaLru;
+ }
+
+ // The replica no longer exists, look for the next one.
+ }
+ return null;
+ }
+
+ /**
+ * Discard any state we are tracking for the given replica. This could mean
+ * the block is either deleted from the block space or the replica is no longer
+ * on transient storage.
+ *
+ * @param deleteSavedCopies true if we should delete the saved copies on
+ * persistent storage. This should be set by the
+ * caller when the block is no longer needed.
+ */
+ @Override
+ synchronized void discardReplica(
+ final String bpid, final long blockId,
+ boolean deleteSavedCopies) {
+ Map<Long, RamDiskReplicaLru> map = replicaMaps.get(bpid);
+
+ if (map == null) {
+ return;
+ }
+
+ RamDiskReplicaLru ramDiskReplicaLru = map.get(blockId);
+
+ if (ramDiskReplicaLru == null) {
+ return;
+ }
+
+ if (deleteSavedCopies) {
+ ramDiskReplicaLru.deleteSavedFiles();
+ }
+
+ map.remove(blockId);
+ replicasPersisted.remove(ramDiskReplicaLru.lastUsedTime, ramDiskReplicaLru);
+
+ // replicasNotPersisted will be lazily GC'ed.
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2d5ed36/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskReplicaTracker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskReplicaTracker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskReplicaTracker.java
new file mode 100644
index 0000000..03fc068
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskReplicaTracker.java
@@ -0,0 +1,245 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import java.io.File;
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public abstract class RamDiskReplicaTracker {
+
+ FsDatasetImpl fsDataset;
+
+ static class RamDiskReplica implements Comparable<RamDiskReplica> {
+ private final String bpid;
+ private final long blockId;
+ private File savedBlockFile;
+ private File savedMetaFile;
+
+ /**
+ * RAM_DISK volume that holds the original replica.
+ */
+ final FsVolumeSpi ramDiskVolume;
+
+ /**
+ * Persistent volume that holds or will hold the saved replica.
+ */
+ FsVolumeImpl lazyPersistVolume;
+
+ RamDiskReplica(final String bpid, final long blockId,
+ final FsVolumeImpl ramDiskVolume) {
+ this.bpid = bpid;
+ this.blockId = blockId;
+ this.ramDiskVolume = ramDiskVolume;
+ lazyPersistVolume = null;
+ savedMetaFile = null;
+ savedBlockFile = null;
+ }
+
+ long getBlockId() {
+ return blockId;
+ }
+
+ String getBlockPoolId() {
+ return bpid;
+ }
+
+ FsVolumeImpl getLazyPersistVolume() {
+ return lazyPersistVolume;
+ }
+
+ void setLazyPersistVolume(FsVolumeImpl volume) {
+ Preconditions.checkState(!volume.isTransientStorage());
+ this.lazyPersistVolume = volume;
+ }
+
+ File getSavedBlockFile() {
+ return savedBlockFile;
+ }
+
+ File getSavedMetaFile() {
+ return savedMetaFile;
+ }
+
+ /**
+ * Record the saved meta and block files on the given volume.
+ *
+ * @param files Meta and block files, in that order.
+ */
+ void recordSavedBlockFiles(File[] files) {
+ this.savedMetaFile = files[0];
+ this.savedBlockFile = files[1];
+ }
+
+ @Override
+ public int hashCode() {
+ return bpid.hashCode() ^ (int) blockId;
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (this == other) {
+ return true;
+ }
+
+ if (other == null || getClass() != other.getClass()) {
+ return false;
+ }
+
+ RamDiskReplica otherState = (RamDiskReplica) other;
+ return (otherState.bpid.equals(bpid) && otherState.blockId == blockId);
+ }
+
+ // Delete the saved meta and block files. Failure to delete can be
+ // ignored, the directory scanner will retry the deletion later.
+ void deleteSavedFiles() {
+ try {
+ if (savedBlockFile != null) {
+ savedBlockFile.delete();
+ savedBlockFile = null;
+ }
+
+ if (savedMetaFile != null) {
+ savedMetaFile.delete();
+ savedMetaFile = null;
+ }
+ } catch (Throwable t) {
+ // Ignore any exceptions.
+ }
+ }
+
+ @Override
+ public int compareTo(RamDiskReplica other) {
+ int bpidResult = bpid.compareTo(other.bpid);
+ if (bpidResult == 0)
+ if (blockId == other.blockId) {
+ return 0;
+ } else if (blockId < other.blockId) {
+ return -1;
+ } else {
+ return 1;
+ }
+ return bpidResult;
+ }
+
+ @Override
+ public String toString() {
+ return "[BlockPoolID=" + bpid + "; BlockId=" + blockId + "]";
+ }
+ }
+
+ /**
+ * Get an instance of the configured RamDiskReplicaTracker based on the
+ * the configuration property
+ * {@link org.apache.hadoop.hdfs.DFSConfigKeys#DFS_DATANODE_RAM_DISK_REPLICA_TRACKER_KEY}.
+ *
+ * @param conf the configuration to be used
+ * @param dataset the FsDataset object.
+ * @return an instance of RamDiskReplicaTracker
+ */
+ static RamDiskReplicaTracker getInstance(final Configuration conf,
+ final FsDatasetImpl fsDataset) {
+ final Class<? extends RamDiskReplicaTracker> trackerClass = conf.getClass(
+ DFSConfigKeys.DFS_DATANODE_RAM_DISK_REPLICA_TRACKER_KEY,
+ DFSConfigKeys.DFS_DATANODE_RAM_DISK_REPLICA_TRACKER_DEFAULT,
+ RamDiskReplicaTracker.class);
+ final RamDiskReplicaTracker tracker = ReflectionUtils.newInstance(
+ trackerClass, conf);
+ tracker.initialize(fsDataset);
+ return tracker;
+ }
+
+ void initialize(final FsDatasetImpl fsDataset) {
+ this.fsDataset = fsDataset;
+ }
+
+ /**
+ * Start tracking a new finalized replica on RAM disk.
+ *
+ * @param transientVolume RAM disk volume that stores the replica.
+ */
+ abstract void addReplica(final String bpid, final long blockId,
+ final FsVolumeImpl transientVolume);
+
+ /**
+ * Invoked when a replica is opened by a client. This may be used as
+ * a heuristic by the eviction scheme.
+ */
+ abstract void touch(final String bpid, final long blockId);
+
+ /**
+ * Get the next replica to write to persistent storage.
+ */
+ abstract RamDiskReplica dequeueNextReplicaToPersist();
+
+ /**
+ * Invoked if a replica that was previously dequeued for persistence
+ * could not be successfully persisted. Add it back so it can be retried
+ * later.
+ */
+ abstract void reenqueueReplicaNotPersisted(
+ final RamDiskReplica ramDiskReplica);
+
+ /**
+ * Invoked when the Lazy persist operation is started by the DataNode.
+ * @param checkpointVolume
+ */
+ abstract void recordStartLazyPersist(
+ final String bpid, final long blockId, FsVolumeImpl checkpointVolume);
+
+ /**
+ * Invoked when the Lazy persist operation is complete.
+ *
+ * @param savedFiles The saved meta and block files, in that order.
+ */
+ abstract void recordEndLazyPersist(
+ final String bpid, final long blockId, final File[] savedFiles);
+
+ /**
+ * Return a candidate replica to remove from RAM Disk. The exact replica
+ * to be returned may depend on the eviction scheme utilized.
+ *
+ * @return
+ */
+ abstract RamDiskReplica getNextCandidateForEviction();
+
+ /**
+ * Return the number of replicas pending persistence to disk.
+ */
+ abstract int numReplicasNotPersisted();
+
+ /**
+ * Discard all state we are tracking for the given replica.
+ */
+ abstract void discardReplica(
+ final String bpid, final long blockId,
+ boolean deleteSavedCopies);
+
+ void discardReplica(RamDiskReplica replica, boolean deleteSavedCopies) {
+ discardReplica(replica.getBlockPoolId(), replica.getBlockId(), deleteSavedCopies);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2d5ed36/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetTestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetTestUtil.java
index 48ddcc2..f9e30e1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetTestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetTestUtil.java
@@ -30,7 +30,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
public class FsDatasetTestUtil {
public static File getFile(FsDatasetSpi<?> fsd, String bpid, long bid) {
- return ((FsDatasetImpl)fsd).getFile(bpid, bid);
+ return ((FsDatasetImpl)fsd).getFile(bpid, bid, false);
}
public static File getBlockFile(FsDatasetSpi<?> fsd, String bpid, Block b
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2d5ed36/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java
index 777779f..95404b3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java
@@ -71,7 +71,7 @@ public class TestLazyPersistFiles {
private static final int THREADPOOL_SIZE = 10;
private static final short REPL_FACTOR = 1;
- private static final int BLOCK_SIZE = 10485760; // 10 MB
+ private static final int BLOCK_SIZE = 5 * 1024 * 1024;
private static final int LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC = 3;
private static final long HEARTBEAT_INTERVAL_SEC = 1;
private static final int HEARTBEAT_RECHECK_INTERVAL_MSEC = 500;
@@ -449,34 +449,51 @@ public class TestLazyPersistFiles {
* @throws InterruptedException
*/
@Test (timeout=300000)
- public void testRamDiskEvictionLRU()
+ public void testRamDiskEvictionIsLru()
throws IOException, InterruptedException {
- startUpCluster(true, 3);
+ final int NUM_PATHS = 5;
+ startUpCluster(true, NUM_PATHS + EVICTION_LOW_WATERMARK);
final String METHOD_NAME = GenericTestUtils.getMethodName();
- final int NUM_PATHS = 6;
- Path paths[] = new Path[NUM_PATHS];
+ Path paths[] = new Path[NUM_PATHS * 2];
- for (int i = 0; i < NUM_PATHS; i++) {
+ for (int i = 0; i < paths.length; i++) {
paths[i] = new Path("/" + METHOD_NAME + "." + i +".dat");
}
- // No eviction for the first half of files
- for (int i = 0; i < NUM_PATHS/2; i++) {
+ for (int i = 0; i < NUM_PATHS; i++) {
makeTestFile(paths[i], BLOCK_SIZE, true);
- ensureFileReplicasOnStorageType(paths[i], RAM_DISK);
}
- // Lazy persist writer persists the first half of files
+ // Sleep for a short time to allow the lazy writer thread to do its job.
Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);
- // Create the second half of files with eviction upon each create.
- for (int i = NUM_PATHS/2; i < NUM_PATHS; i++) {
- makeTestFile(paths[i], BLOCK_SIZE, true);
+ for (int i = 0; i < NUM_PATHS; ++i) {
ensureFileReplicasOnStorageType(paths[i], RAM_DISK);
+ }
+
+ // Open the files for read in a random order.
+ ArrayList<Integer> indexes = new ArrayList<Integer>(NUM_PATHS);
+ for (int i = 0; i < NUM_PATHS; ++i) {
+ indexes.add(i);
+ }
+ Collections.shuffle(indexes);
- // path[i-NUM_PATHS/2] is expected to be evicted by LRU
+ for (int i = 0; i < NUM_PATHS; ++i) {
+ LOG.info("Touching file " + paths[indexes.get(i)]);
+ DFSTestUtil.readFile(fs, paths[indexes.get(i)]);
+ }
+
+ // Create an equal number of new files ensuring that the previous
+ // files are evicted in the same order they were read.
+ for (int i = 0; i < NUM_PATHS; ++i) {
+ makeTestFile(paths[i + NUM_PATHS], BLOCK_SIZE, true);
triggerBlockReport();
- ensureFileReplicasOnStorageType(paths[i - NUM_PATHS / 2], DEFAULT);
+ Thread.sleep(3000);
+ ensureFileReplicasOnStorageType(paths[i + NUM_PATHS], RAM_DISK);
+ ensureFileReplicasOnStorageType(paths[indexes.get(i)], DEFAULT);
+ for (int j = i + 1; j < NUM_PATHS; ++j) {
+ ensureFileReplicasOnStorageType(paths[indexes.get(j)], RAM_DISK);
+ }
}
}