You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2014/10/17 23:45:02 UTC
[15/34] git commit: HDFS-6930. Improve replica eviction from RAM
disk. (Arpit Agarwal)
HDFS-6930. Improve replica eviction from RAM disk. (Arpit Agarwal)
Conflicts:
hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-6581.txt
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/21046d83
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/21046d83
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/21046d83
Branch: refs/heads/branch-2
Commit: 21046d83104948e2211472cfd720c5857b0b9748
Parents: 225ffdb
Author: arp <ar...@apache.org>
Authored: Wed Sep 3 13:53:01 2014 -0700
Committer: Jitendra Pandey <Ji...@Jitendra-Pandeys-MacBook-Pro-4.local>
Committed: Fri Oct 17 13:42:01 2014 -0700
----------------------------------------------------------------------
.../datanode/fsdataset/impl/FsDatasetImpl.java | 214 +++++++++----------
.../fsdataset/impl/LazyWriteReplicaTracker.java | 52 +++--
2 files changed, 137 insertions(+), 129 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/21046d83/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index 797dcc6..8d93dcc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -996,83 +996,6 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
}
}
- /**
- * Attempt to evict one or more transient block replicas we have at least
- * spaceNeeded bytes free.
- *
- * @return true if we were able to free up at least spaceNeeded bytes, false
- * otherwise.
- */
- private boolean tryToEvictBlocks(final String bpid, final long spaceNeeded)
- throws IOException {
-
- boolean isAvailable = false;
-
- LOG.info("Attempting to evict blocks from transient storage");
-
- // Reverse the map so we can iterate in order of replica creation times,
- // evicting oldest replicas one at a time until we have sufficient space.
- TreeMultimap<Long, LazyWriteReplicaTracker.ReplicaState> lruMap =
- lazyWriteReplicaTracker.getLruMap();
- int blocksEvicted = 0;
-
- // TODO: It is really inefficient to do this with the Object lock held!
- // TODO: This logic is here just for prototyping.
- // TODO: We should replace it with proactive discard when ram_disk free space
- // TODO: falls below a low watermark. That way we avoid fs operations on the
- // TODO: hot path with the lock held.
- synchronized (this) {
- long currentTime = System.currentTimeMillis() / 1000;
- for (Map.Entry<Long, LazyWriteReplicaTracker.ReplicaState> entry : lruMap.entries()) {
- LazyWriteReplicaTracker.ReplicaState lazyWriteReplica = entry.getValue();
- LOG.info("RAM_DISK: Evicting blockId=" + lazyWriteReplica.blockId +
- "; block LMT=" + entry.getKey() +
- "; currentTime=" + currentTime);
- ReplicaInfo replicaInfo = getReplicaInfo(bpid, lazyWriteReplica.blockId);
- Preconditions.checkState(replicaInfo.getVolume().isTransientStorage());
- File blockFile = replicaInfo.getBlockFile();
- File metaFile = replicaInfo.getMetaFile();
- long used = blockFile.length() + metaFile.length();
- lazyWriteReplicaTracker.discardReplica(bpid, entry.getValue().blockId, false);
-
- // Move the persisted replica to the finalized directory of
- // the target volume.
- BlockPoolSlice bpSlice =
- lazyWriteReplica.lazyPersistVolume.getBlockPoolSlice(bpid);
- File newBlockFile = bpSlice.activateSavedReplica(
- replicaInfo, lazyWriteReplica.savedBlockFile);
-
- ReplicaInfo newReplicaInfo =
- new FinalizedReplica(replicaInfo.getBlockId(),
- replicaInfo.getBytesOnDisk(),
- replicaInfo.getGenerationStamp(),
- lazyWriteReplica.lazyPersistVolume,
- newBlockFile.getParentFile());
-
- // Update the volumeMap entry. This removes the old entry.
- volumeMap.add(bpid, newReplicaInfo);
-
- // Remove the old replicas.
- blockFile.delete();
- metaFile.delete();
- ((FsVolumeImpl) replicaInfo.getVolume()).decDfsUsed(bpid, used);
- ++blocksEvicted;
-
- if (replicaInfo.getVolume().getAvailable() > spaceNeeded) {
- LOG.info("RAM_DISK: freed up " + spaceNeeded + " bytes for new block");
- isAvailable = true;
- break;
- }
-
- if (blocksEvicted == MAX_BLOCK_EVICTIONS_PER_ITERATION) {
- break;
- }
- }
- }
-
- return isAvailable;
- }
-
@Override // FsDatasetSpi
public synchronized ReplicaInPipeline createRbw(StorageType storageType,
ExtendedBlock b, boolean allowLazyPersist) throws IOException {
@@ -1095,13 +1018,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
}
} catch (DiskOutOfSpaceException de) {
if (allowLazyPersist) {
- if (!tryToEvictBlocks(b.getBlockPoolId(), b.getNumBytes())) {
- // Eviction did not work, we'll just fallback to DEFAULT storage.
- LOG.info("RAM_DISK: Failed to free up " + b.getNumBytes() +
- " bytes for new block. Will fallback to DEFAULT " +
- "storage");
- allowLazyPersist = false;
- }
+ allowLazyPersist = false;
continue;
}
throw de;
@@ -2376,20 +2293,26 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
class LazyWriter implements Runnable {
private volatile boolean shouldRun = true;
final int checkpointerInterval;
+ final long estimateBlockSize;
+
+ public static final int LOW_WATERMARK_FREE_SPACE_PERCENT = 10;
+ public static final int LOW_WATERMARK_FREE_SPACE_REPLICAS = 3;
+
public LazyWriter(final int checkpointerInterval) {
this.checkpointerInterval = checkpointerInterval;
+ this.estimateBlockSize = conf.getLongBytes(
+ DFSConfigKeys.DFS_BLOCK_SIZE_KEY,
+ DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT);
}
private void moveReplicaToNewVolume(String bpid, long blockId)
throws IOException {
- LOG.info("LazyWriter invoked to save blockId=" + blockId + "; bpid=" + bpid);
-
FsVolumeImpl targetVolume;
ReplicaInfo replicaInfo;
- synchronized (this) {
+ synchronized (FsDatasetImpl.this) {
replicaInfo = volumeMap.get(bpid, blockId);
if (replicaInfo == null || !replicaInfo.getVolume().isTransientStorage()) {
@@ -2403,20 +2326,20 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
// Pick a target volume for the block.
targetVolume = volumes.getNextVolume(
StorageType.DEFAULT, replicaInfo.getNumBytes());
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("LazyWriter starting to save blockId=" + blockId + "; bpid=" + bpid);
- }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("LazyWriter starting to save blockId=" + blockId + "; bpid=" + bpid);
+ }
- lazyWriteReplicaTracker.recordStartLazyPersist(bpid, blockId, targetVolume);
- File savedBlockFile = targetVolume.getBlockPoolSlice(bpid)
- .lazyPersistReplica(replicaInfo);
- lazyWriteReplicaTracker.recordEndLazyPersist(bpid, blockId, savedBlockFile);
+ lazyWriteReplicaTracker.recordStartLazyPersist(bpid, blockId, targetVolume);
+ File savedBlockFile = targetVolume.getBlockPoolSlice(bpid)
+ .lazyPersistReplica(replicaInfo);
+ lazyWriteReplicaTracker.recordEndLazyPersist(bpid, blockId, savedBlockFile);
- if (LOG.isDebugEnabled()) {
- LOG.debug("LazyWriter finished saving blockId=" + blockId + "; bpid=" + bpid +
- " to file " + savedBlockFile);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("LazyWriter finished saving blockId=" + blockId + "; bpid=" + bpid +
+ " to file " + savedBlockFile);
+ }
}
}
@@ -2430,28 +2353,100 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
boolean succeeded = false;
try {
- synchronized (this) {
- replicaState = lazyWriteReplicaTracker.dequeueNextReplicaToPersist();
- if (replicaState == null) {
- return false;
- }
+ replicaState = lazyWriteReplicaTracker.dequeueNextReplicaToPersist();
+ if (replicaState != null) {
+ // Move the replica outside the lock.
+ moveReplicaToNewVolume(replicaState.bpid, replicaState.blockId);
}
-
- // Move the replica outside the lock.
- moveReplicaToNewVolume(replicaState.bpid, replicaState.blockId);
succeeded = true;
} catch(IOException ioe) {
LOG.warn("Exception saving replica " + replicaState, ioe);
} finally {
if (!succeeded && replicaState != null) {
LOG.warn("Failed to save replica " + replicaState + ". re-enqueueing it.");
- lazyWriteReplicaTracker.reenqueueReplica(replicaState);
+ lazyWriteReplicaTracker.reenqueueReplicaNotPersisted(replicaState);
}
}
return succeeded;
}
+ private boolean transientFreeSpaceBelowThreshold() throws IOException {
+ long free = 0;
+ long capacity = 0;
+
+ // Don't worry about fragmentation for now. We don't expect more than one
+ // transient volume per DN.
+ for (FsVolumeImpl v : volumes.volumes) {
+ if (v.isTransientStorage()) {
+ capacity += v.getCapacity();
+ free += v.getAvailable();
+ }
+ }
+
+ if (capacity == 0) {
+ return false;
+ }
+
+ int percentFree = (int) (free * 100 / capacity);
+ return percentFree < LOW_WATERMARK_FREE_SPACE_PERCENT ||
+ free < (estimateBlockSize * LOW_WATERMARK_FREE_SPACE_REPLICAS);
+ }
+
+ /**
+ * Attempt to evict one or more transient block replicas we have at least
+ * spaceNeeded bytes free.
+ */
+ private synchronized void evictBlocks() throws IOException {
+ int iterations = 0;
+
+ LazyWriteReplicaTracker.ReplicaState replicaState =
+ lazyWriteReplicaTracker.getNextCandidateForEviction();
+
+ while (replicaState != null &&
+ iterations++ < MAX_BLOCK_EVICTIONS_PER_ITERATION &
+ transientFreeSpaceBelowThreshold()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.info("Evicting block " + replicaState);
+ }
+ ReplicaInfo replicaInfo = getReplicaInfo(replicaState.bpid, replicaState.blockId);
+ Preconditions.checkState(replicaInfo.getVolume().isTransientStorage());
+ File blockFile = replicaInfo.getBlockFile();
+ File metaFile = replicaInfo.getMetaFile();
+ long blockFileUsed = blockFile.length();
+ long metaFileUsed = metaFile.length();
+ lazyWriteReplicaTracker.discardReplica(replicaState, false);
+
+ // Move the replica from lazyPersist/ to finalized/ on target volume
+ BlockPoolSlice bpSlice =
+ replicaState.lazyPersistVolume.getBlockPoolSlice(replicaState.bpid);
+ File newBlockFile = bpSlice.activateSavedReplica(
+ replicaInfo, replicaState.savedBlockFile);
+
+ ReplicaInfo newReplicaInfo =
+ new FinalizedReplica(replicaInfo.getBlockId(),
+ replicaInfo.getBytesOnDisk(),
+ replicaInfo.getGenerationStamp(),
+ replicaState.lazyPersistVolume,
+ newBlockFile.getParentFile());
+
+ // Update the volumeMap entry. This removes the old entry.
+ volumeMap.add(replicaState.bpid, newReplicaInfo);
+
+ // Remove the old replicas from transient storage.
+ if (blockFile.delete() || !blockFile.exists()) {
+ ((FsVolumeImpl) replicaInfo.getVolume()).decDfsUsed(replicaState.bpid, blockFileUsed);
+ }
+ if (metaFile.delete() || !metaFile.exists()) {
+ ((FsVolumeImpl) replicaInfo.getVolume()).decDfsUsed(replicaState.bpid, metaFileUsed);
+ }
+
+ // If deletion failed then the directory scanner will cleanup the blocks
+ // eventually.
+ replicaState = lazyWriteReplicaTracker.getNextCandidateForEviction();
+ }
+ }
+
@Override
public void run() {
int numSuccessiveFailures = 0;
@@ -2459,11 +2454,12 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
while (fsRunning && shouldRun) {
try {
numSuccessiveFailures = saveNextReplica() ? 0 : (numSuccessiveFailures + 1);
+ evictBlocks();
// Sleep if we have no more work to do or if it looks like we are not
// making any forward progress. This is to ensure that if all persist
// operations are failing we don't keep retrying them in a tight loop.
- if (numSuccessiveFailures == lazyWriteReplicaTracker.numReplicasNotPersisted()) {
+ if (numSuccessiveFailures >= lazyWriteReplicaTracker.numReplicasNotPersisted()) {
Thread.sleep(checkpointerInterval * 1000);
numSuccessiveFailures = 0;
}
@@ -2471,7 +2467,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
LOG.info("LazyWriter was interrupted, exiting");
break;
} catch (Exception e) {
- LOG.error("Ignoring exception in LazyWriter:", e);
+ LOG.warn("Ignoring exception in LazyWriter:", e);
}
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/21046d83/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyWriteReplicaTracker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyWriteReplicaTracker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyWriteReplicaTracker.java
index 222b63a..9f020c4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyWriteReplicaTracker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyWriteReplicaTracker.java
@@ -104,30 +104,23 @@ class LazyWriteReplicaTracker {
/**
* Queue of replicas that need to be written to disk.
+ * Stale entries are GC'd by dequeueNextReplicaToPersist.
*/
final Queue<ReplicaState> replicasNotPersisted;
/**
- * A map of blockId to persist complete time for transient blocks. This allows
- * us to evict LRU blocks from transient storage. Protected by 'this'
- * Object lock.
+ * Queue of replicas in the order in which they were persisted.
+ * We'll dequeue them in the same order.
+ * We can improve the eviction scheme later.
+ * Stale entries are GC'd by getNextCandidateForEviction.
*/
- final Map<ReplicaState, Long> replicasPersisted;
+ final Queue<ReplicaState> replicasPersisted;
LazyWriteReplicaTracker(final FsDatasetImpl fsDataset) {
this.fsDataset = fsDataset;
replicaMaps = new HashMap<String, Map<Long, ReplicaState>>();
replicasNotPersisted = new LinkedList<ReplicaState>();
- replicasPersisted = new HashMap<ReplicaState, Long>();
- }
-
- TreeMultimap<Long, ReplicaState> getLruMap() {
- // TODO: This can be made more efficient.
- TreeMultimap<Long, ReplicaState> reversedMap = TreeMultimap.create();
- for (Map.Entry<ReplicaState, Long> entry : replicasPersisted.entrySet()) {
- reversedMap.put(entry.getValue(), entry.getKey());
- }
- return reversedMap;
+ replicasPersisted = new LinkedList<ReplicaState>();
}
synchronized void addReplica(String bpid, long blockId,
@@ -171,7 +164,8 @@ class LazyWriteReplicaTracker {
// one.
replicasNotPersisted.remove(replicaState);
}
- replicasPersisted.put(replicaState, System.currentTimeMillis() / 1000);
+
+ replicasPersisted.add(replicaState);
}
synchronized ReplicaState dequeueNextReplicaToPersist() {
@@ -188,14 +182,36 @@ class LazyWriteReplicaTracker {
return null;
}
- synchronized void reenqueueReplica(final ReplicaState replicaState) {
+ synchronized void reenqueueReplicaNotPersisted(final ReplicaState replicaState) {
replicasNotPersisted.add(replicaState);
}
+ synchronized void reenqueueReplicaPersisted(final ReplicaState replicaState) {
+ replicasPersisted.add(replicaState);
+ }
+
synchronized int numReplicasNotPersisted() {
return replicasNotPersisted.size();
}
+ synchronized ReplicaState getNextCandidateForEviction() {
+ while (replicasPersisted.size() != 0) {
+ ReplicaState replicaState = replicasPersisted.remove();
+ Map<Long, ReplicaState> replicaMap = replicaMaps.get(replicaState.bpid);
+
+ if (replicaMap != null && replicaMap.get(replicaState.blockId) != null) {
+ return replicaState;
+ }
+
+ // The replica no longer exists, look for the next one.
+ }
+ return null;
+ }
+
+ void discardReplica(ReplicaState replicaState, boolean force) {
+ discardReplica(replicaState.bpid, replicaState.blockId, force);
+ }
+
synchronized void discardReplica(
final String bpid, final long blockId, boolean force) {
Map<Long, ReplicaState> map = replicaMaps.get(bpid);
@@ -221,9 +237,5 @@ class LazyWriteReplicaTracker {
}
map.remove(blockId);
- replicasPersisted.remove(replicaState);
-
- // Leave the replica in replicasNotPersisted if its present.
- // dequeueNextReplicaToPersist will GC it eventually.
}
}