You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2014/10/17 23:45:02 UTC

[15/34] git commit: HDFS-6930. Improve replica eviction from RAM disk. (Arpit Agarwal)

HDFS-6930. Improve replica eviction from RAM disk. (Arpit Agarwal)

Conflicts:
	hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-6581.txt


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/21046d83
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/21046d83
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/21046d83

Branch: refs/heads/branch-2
Commit: 21046d83104948e2211472cfd720c5857b0b9748
Parents: 225ffdb
Author: arp <ar...@apache.org>
Authored: Wed Sep 3 13:53:01 2014 -0700
Committer: Jitendra Pandey <Ji...@Jitendra-Pandeys-MacBook-Pro-4.local>
Committed: Fri Oct 17 13:42:01 2014 -0700

----------------------------------------------------------------------
 .../datanode/fsdataset/impl/FsDatasetImpl.java  | 214 +++++++++----------
 .../fsdataset/impl/LazyWriteReplicaTracker.java |  52 +++--
 2 files changed, 137 insertions(+), 129 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/21046d83/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index 797dcc6..8d93dcc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -996,83 +996,6 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     }
   }
 
-  /**
-   * Attempt to evict one or more transient block replicas we have at least
-   * spaceNeeded bytes free.
-   *
-   * @return true if we were able to free up at least spaceNeeded bytes, false
-   *          otherwise.
-   */
-  private boolean tryToEvictBlocks(final String bpid, final long spaceNeeded)
-      throws IOException {
-
-    boolean isAvailable = false;
-
-    LOG.info("Attempting to evict blocks from transient storage");
-
-    // Reverse the map so we can iterate in order of replica creation times,
-    // evicting oldest replicas one at a time until we have sufficient space.
-    TreeMultimap<Long, LazyWriteReplicaTracker.ReplicaState> lruMap =
-        lazyWriteReplicaTracker.getLruMap();
-    int blocksEvicted = 0;
-
-    // TODO: It is really inefficient to do this with the Object lock held!
-    // TODO: This logic is here just for prototyping.
-    // TODO: We should replace it with proactive discard when ram_disk free space
-    // TODO:   falls below a low watermark. That way we avoid fs operations on the
-    // TODO:   hot path with the lock held.
-    synchronized (this) {
-      long currentTime = System.currentTimeMillis() / 1000;
-      for (Map.Entry<Long, LazyWriteReplicaTracker.ReplicaState> entry : lruMap.entries()) {
-        LazyWriteReplicaTracker.ReplicaState lazyWriteReplica = entry.getValue();
-        LOG.info("RAM_DISK: Evicting blockId=" + lazyWriteReplica.blockId +
-                     "; block LMT=" + entry.getKey() +
-                     "; currentTime=" + currentTime);
-        ReplicaInfo replicaInfo = getReplicaInfo(bpid, lazyWriteReplica.blockId);
-        Preconditions.checkState(replicaInfo.getVolume().isTransientStorage());
-        File blockFile = replicaInfo.getBlockFile();
-        File metaFile = replicaInfo.getMetaFile();
-        long used = blockFile.length() + metaFile.length();
-        lazyWriteReplicaTracker.discardReplica(bpid, entry.getValue().blockId, false);
-
-        // Move the persisted replica to the finalized directory of
-        // the target volume.
-        BlockPoolSlice bpSlice =
-            lazyWriteReplica.lazyPersistVolume.getBlockPoolSlice(bpid);
-        File newBlockFile = bpSlice.activateSavedReplica(
-            replicaInfo, lazyWriteReplica.savedBlockFile);
-
-        ReplicaInfo newReplicaInfo =
-            new FinalizedReplica(replicaInfo.getBlockId(),
-                                 replicaInfo.getBytesOnDisk(),
-                                 replicaInfo.getGenerationStamp(),
-                                 lazyWriteReplica.lazyPersistVolume,
-                                 newBlockFile.getParentFile());
-
-        // Update the volumeMap entry. This removes the old entry.
-        volumeMap.add(bpid, newReplicaInfo);
-
-        // Remove the old replicas.
-        blockFile.delete();
-        metaFile.delete();
-        ((FsVolumeImpl) replicaInfo.getVolume()).decDfsUsed(bpid, used);
-        ++blocksEvicted;
-
-        if (replicaInfo.getVolume().getAvailable() > spaceNeeded) {
-          LOG.info("RAM_DISK: freed up " + spaceNeeded + " bytes for new block");
-          isAvailable = true;
-          break;
-        }
-
-        if (blocksEvicted == MAX_BLOCK_EVICTIONS_PER_ITERATION) {
-          break;
-        }
-      }
-    }
-
-    return isAvailable;
-  }
-
   @Override // FsDatasetSpi
   public synchronized ReplicaInPipeline createRbw(StorageType storageType,
       ExtendedBlock b, boolean allowLazyPersist) throws IOException {
@@ -1095,13 +1018,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
         }
       } catch (DiskOutOfSpaceException de) {
         if (allowLazyPersist) {
-          if (!tryToEvictBlocks(b.getBlockPoolId(), b.getNumBytes())) {
-            // Eviction did not work, we'll just fallback to DEFAULT storage.
-            LOG.info("RAM_DISK: Failed to free up " + b.getNumBytes() +
-                         " bytes for new block. Will fallback to DEFAULT " +
-                         "storage");
-            allowLazyPersist = false;
-          }
+          allowLazyPersist = false;
           continue;
         }
         throw de;
@@ -2376,20 +2293,26 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   class LazyWriter implements Runnable {
     private volatile boolean shouldRun = true;
     final int checkpointerInterval;
+    final long estimateBlockSize;
+
+    public static final int LOW_WATERMARK_FREE_SPACE_PERCENT = 10;
+    public static final int LOW_WATERMARK_FREE_SPACE_REPLICAS = 3;
+
 
     public LazyWriter(final int checkpointerInterval) {
       this.checkpointerInterval = checkpointerInterval;
+      this.estimateBlockSize = conf.getLongBytes(
+          DFSConfigKeys.DFS_BLOCK_SIZE_KEY,
+          DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT);
     }
 
     private void moveReplicaToNewVolume(String bpid, long blockId)
         throws IOException {
 
-      LOG.info("LazyWriter invoked to save blockId=" + blockId + "; bpid=" + bpid);
-
       FsVolumeImpl targetVolume;
       ReplicaInfo replicaInfo;
 
-      synchronized (this) {
+      synchronized (FsDatasetImpl.this) {
         replicaInfo = volumeMap.get(bpid, blockId);
 
         if (replicaInfo == null || !replicaInfo.getVolume().isTransientStorage()) {
@@ -2403,20 +2326,20 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
         // Pick a target volume for the block.
         targetVolume = volumes.getNextVolume(
             StorageType.DEFAULT, replicaInfo.getNumBytes());
-      }
 
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("LazyWriter starting to save blockId=" + blockId + "; bpid=" + bpid);
-      }
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("LazyWriter starting to save blockId=" + blockId + "; bpid=" + bpid);
+        }
 
-      lazyWriteReplicaTracker.recordStartLazyPersist(bpid, blockId, targetVolume);
-      File savedBlockFile = targetVolume.getBlockPoolSlice(bpid)
-                                        .lazyPersistReplica(replicaInfo);
-      lazyWriteReplicaTracker.recordEndLazyPersist(bpid, blockId, savedBlockFile);
+        lazyWriteReplicaTracker.recordStartLazyPersist(bpid, blockId, targetVolume);
+        File savedBlockFile = targetVolume.getBlockPoolSlice(bpid)
+                                          .lazyPersistReplica(replicaInfo);
+        lazyWriteReplicaTracker.recordEndLazyPersist(bpid, blockId, savedBlockFile);
 
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("LazyWriter finished saving blockId=" + blockId + "; bpid=" + bpid +
-                 " to file " + savedBlockFile);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("LazyWriter finished saving blockId=" + blockId + "; bpid=" + bpid +
+                        " to file " + savedBlockFile);
+        }
       }
     }
 
@@ -2430,28 +2353,100 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       boolean succeeded = false;
 
       try {
-        synchronized (this) {
-          replicaState = lazyWriteReplicaTracker.dequeueNextReplicaToPersist();
-          if (replicaState == null) {
-            return false;
-          }
+        replicaState = lazyWriteReplicaTracker.dequeueNextReplicaToPersist();
+        if (replicaState != null) {
+          // Move the replica outside the lock.
+          moveReplicaToNewVolume(replicaState.bpid, replicaState.blockId);
         }
-
-        // Move the replica outside the lock.
-        moveReplicaToNewVolume(replicaState.bpid, replicaState.blockId);
         succeeded = true;
       } catch(IOException ioe) {
         LOG.warn("Exception saving replica " + replicaState, ioe);
       } finally {
         if (!succeeded && replicaState != null) {
           LOG.warn("Failed to save replica " + replicaState + ". re-enqueueing it.");
-          lazyWriteReplicaTracker.reenqueueReplica(replicaState);
+          lazyWriteReplicaTracker.reenqueueReplicaNotPersisted(replicaState);
         }
       }
 
       return succeeded;
     }
 
+    private boolean transientFreeSpaceBelowThreshold() throws IOException {
+      long free = 0;
+      long capacity = 0;
+
+      // Don't worry about fragmentation for now. We don't expect more than one
+      // transient volume per DN.
+      for (FsVolumeImpl v : volumes.volumes) {
+        if (v.isTransientStorage()) {
+          capacity += v.getCapacity();
+          free += v.getAvailable();
+        }
+      }
+
+      if (capacity == 0) {
+        return false;
+      }
+
+      int percentFree = (int) (free * 100 / capacity);
+      return percentFree < LOW_WATERMARK_FREE_SPACE_PERCENT ||
+             free < (estimateBlockSize * LOW_WATERMARK_FREE_SPACE_REPLICAS);
+    }
+
+    /**
+     * Attempt to evict one or more transient block replicas we have at least
+     * spaceNeeded bytes free.
+     */
+    private synchronized void evictBlocks() throws IOException {
+      int iterations = 0;
+
+      LazyWriteReplicaTracker.ReplicaState replicaState =
+          lazyWriteReplicaTracker.getNextCandidateForEviction();
+
+      while (replicaState != null &&
+             iterations++ < MAX_BLOCK_EVICTIONS_PER_ITERATION &
+             transientFreeSpaceBelowThreshold()) {
+        if (LOG.isDebugEnabled()) {
+          LOG.info("Evicting block " + replicaState);
+        }
+        ReplicaInfo replicaInfo = getReplicaInfo(replicaState.bpid, replicaState.blockId);
+        Preconditions.checkState(replicaInfo.getVolume().isTransientStorage());
+        File blockFile = replicaInfo.getBlockFile();
+        File metaFile = replicaInfo.getMetaFile();
+        long blockFileUsed = blockFile.length();
+        long metaFileUsed = metaFile.length();
+        lazyWriteReplicaTracker.discardReplica(replicaState, false);
+
+        // Move the replica from lazyPersist/ to finalized/ on target volume
+        BlockPoolSlice bpSlice =
+            replicaState.lazyPersistVolume.getBlockPoolSlice(replicaState.bpid);
+        File newBlockFile = bpSlice.activateSavedReplica(
+            replicaInfo, replicaState.savedBlockFile);
+
+        ReplicaInfo newReplicaInfo =
+            new FinalizedReplica(replicaInfo.getBlockId(),
+                                 replicaInfo.getBytesOnDisk(),
+                                 replicaInfo.getGenerationStamp(),
+                                 replicaState.lazyPersistVolume,
+                                 newBlockFile.getParentFile());
+
+        // Update the volumeMap entry. This removes the old entry.
+        volumeMap.add(replicaState.bpid, newReplicaInfo);
+
+        // Remove the old replicas from transient storage.
+        if (blockFile.delete() || !blockFile.exists()) {
+          ((FsVolumeImpl) replicaInfo.getVolume()).decDfsUsed(replicaState.bpid, blockFileUsed);
+        }
+        if (metaFile.delete() || !metaFile.exists()) {
+          ((FsVolumeImpl) replicaInfo.getVolume()).decDfsUsed(replicaState.bpid, metaFileUsed);
+        }
+
+        // If deletion failed then the directory scanner will cleanup the blocks
+        // eventually.
+        replicaState = lazyWriteReplicaTracker.getNextCandidateForEviction();
+      }
+    }
+
     @Override
     public void run() {
       int numSuccessiveFailures = 0;
@@ -2459,11 +2454,12 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
       while (fsRunning && shouldRun) {
         try {
           numSuccessiveFailures = saveNextReplica() ? 0 : (numSuccessiveFailures + 1);
+          evictBlocks();
 
           // Sleep if we have no more work to do or if it looks like we are not
           // making any forward progress. This is to ensure that if all persist
           // operations are failing we don't keep retrying them in a tight loop.
-          if (numSuccessiveFailures == lazyWriteReplicaTracker.numReplicasNotPersisted()) {
+          if (numSuccessiveFailures >= lazyWriteReplicaTracker.numReplicasNotPersisted()) {
             Thread.sleep(checkpointerInterval * 1000);
             numSuccessiveFailures = 0;
           }
@@ -2471,7 +2467,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
           LOG.info("LazyWriter was interrupted, exiting");
           break;
         } catch (Exception e) {
-          LOG.error("Ignoring exception in LazyWriter:", e);
+          LOG.warn("Ignoring exception in LazyWriter:", e);
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/21046d83/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyWriteReplicaTracker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyWriteReplicaTracker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyWriteReplicaTracker.java
index 222b63a..9f020c4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyWriteReplicaTracker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/LazyWriteReplicaTracker.java
@@ -104,30 +104,23 @@ class LazyWriteReplicaTracker {
 
   /**
    * Queue of replicas that need to be written to disk.
+   * Stale entries are GC'd by dequeueNextReplicaToPersist.
    */
   final Queue<ReplicaState> replicasNotPersisted;
 
   /**
-   * A map of blockId to persist complete time for transient blocks. This allows
-   * us to evict LRU blocks from transient storage. Protected by 'this'
-   * Object lock.
+   * Queue of replicas in the order in which they were persisted.
+   * We'll dequeue them in the same order.
+   * We can improve the eviction scheme later.
+   * Stale entries are GC'd by getNextCandidateForEviction.
    */
-  final Map<ReplicaState, Long> replicasPersisted;
+  final Queue<ReplicaState> replicasPersisted;
 
   LazyWriteReplicaTracker(final FsDatasetImpl fsDataset) {
     this.fsDataset = fsDataset;
     replicaMaps = new HashMap<String, Map<Long, ReplicaState>>();
     replicasNotPersisted = new LinkedList<ReplicaState>();
-    replicasPersisted = new HashMap<ReplicaState, Long>();
-  }
-
-  TreeMultimap<Long, ReplicaState> getLruMap() {
-    // TODO: This can be made more efficient.
-    TreeMultimap<Long, ReplicaState> reversedMap = TreeMultimap.create();
-    for (Map.Entry<ReplicaState, Long> entry : replicasPersisted.entrySet()) {
-      reversedMap.put(entry.getValue(), entry.getKey());
-    }
-    return reversedMap;
+    replicasPersisted = new LinkedList<ReplicaState>();
   }
 
   synchronized void addReplica(String bpid, long blockId,
@@ -171,7 +164,8 @@ class LazyWriteReplicaTracker {
       // one.
       replicasNotPersisted.remove(replicaState);
     }
-    replicasPersisted.put(replicaState, System.currentTimeMillis() / 1000);
+
+    replicasPersisted.add(replicaState);
   }
 
   synchronized ReplicaState dequeueNextReplicaToPersist() {
@@ -188,14 +182,36 @@ class LazyWriteReplicaTracker {
     return null;
   }
 
-  synchronized void reenqueueReplica(final ReplicaState replicaState) {
+  synchronized void reenqueueReplicaNotPersisted(final ReplicaState replicaState) {
     replicasNotPersisted.add(replicaState);
   }
 
+  synchronized void reenqueueReplicaPersisted(final ReplicaState replicaState) {
+    replicasPersisted.add(replicaState);
+  }
+
   synchronized int numReplicasNotPersisted() {
     return replicasNotPersisted.size();
   }
 
+  synchronized ReplicaState getNextCandidateForEviction() {
+    while (replicasPersisted.size() != 0) {
+      ReplicaState replicaState = replicasPersisted.remove();
+      Map<Long, ReplicaState> replicaMap = replicaMaps.get(replicaState.bpid);
+
+      if (replicaMap != null && replicaMap.get(replicaState.blockId) != null) {
+        return replicaState;
+      }
+
+      // The replica no longer exists, look for the next one.
+    }
+    return null;
+  }
+
+  void discardReplica(ReplicaState replicaState, boolean force) {
+    discardReplica(replicaState.bpid, replicaState.blockId, force);
+  }
+
   synchronized void discardReplica(
       final String bpid, final long blockId, boolean force) {
     Map<Long, ReplicaState> map = replicaMaps.get(bpid);
@@ -221,9 +237,5 @@ class LazyWriteReplicaTracker {
     }
 
     map.remove(blockId);
-    replicasPersisted.remove(replicaState);
-
-    // Leave the replica in replicasNotPersisted if its present.
-    // dequeueNextReplicaToPersist will GC it eventually.
   }
 }