You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by um...@apache.org on 2014/08/14 06:21:12 UTC

svn commit: r1617873 - in /hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs: CHANGES.txt src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java

Author: umamahesh
Date: Thu Aug 14 04:21:11 2014
New Revision: 1617873

URL: http://svn.apache.org/r1617873
Log:
Merge. HDFS-6783. Fix HDFS CacheReplicationMonitor rescan logic. Contributed by Yi Liu.

Modified:
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1617873&r1=1617872&r2=1617873&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Thu Aug 14 04:21:11 2014
@@ -248,6 +248,8 @@ Release 2.6.0 - UNRELEASED
     HDFS-6830. BlockInfo.addStorage fails when DN changes the storage for a
     block replica (Arpit Agarwal)
 
+    HDFS-6783. Fix HDFS CacheReplicationMonitor rescan logic. (Yi Liu via umamahesh)
+
 Release 2.5.0 - UNRELEASED
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java?rev=1617873&r1=1617872&r2=1617873&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java Thu Aug 14 04:21:11 2014
@@ -104,21 +104,21 @@ public class CacheReplicationMonitor ext
   private final Condition scanFinished;
 
   /**
-   * Whether there are pending CacheManager operations that necessitate a
-   * CacheReplicationMonitor rescan. Protected by the CRM lock.
+   * The number of rescans completed. Used to wait for scans to finish.
+   * Protected by the CacheReplicationMonitor lock.
    */
-  private boolean needsRescan = true;
+  private long completedScanCount = 0;
 
   /**
-   * Whether we are currently doing a rescan. Protected by the CRM lock.
+   * The scan we're currently performing, or -1 if no scan is in progress.
+   * Protected by the CacheReplicationMonitor lock.
    */
-  private boolean isScanning = false;
+  private long curScanCount = -1;
 
   /**
-   * The number of rescans completed. Used to wait for scans to finish.
-   * Protected by the CacheReplicationMonitor lock.
+   * The number of rescans we need to complete.  Protected by the CRM lock.
    */
-  private long scanCount = 0;
+  private long neededScanCount = 0;
 
   /**
    * True if this monitor should terminate. Protected by the CRM lock.
@@ -169,7 +169,7 @@ public class CacheReplicationMonitor ext
               LOG.info("Shutting down CacheReplicationMonitor");
               return;
             }
-            if (needsRescan) {
+            if (completedScanCount < neededScanCount) {
               LOG.info("Rescanning because of pending operations");
               break;
             }
@@ -182,8 +182,6 @@ public class CacheReplicationMonitor ext
             doRescan.await(delta, TimeUnit.MILLISECONDS);
             curTimeMs = Time.monotonicNow();
           }
-          isScanning = true;
-          needsRescan = false;
         } finally {
           lock.unlock();
         }
@@ -194,8 +192,8 @@ public class CacheReplicationMonitor ext
         // Update synchronization-related variables.
         lock.lock();
         try {
-          isScanning = false;
-          scanCount++;
+          completedScanCount = curScanCount;
+          curScanCount = -1;
           scanFinished.signalAll();
         } finally {
           lock.unlock();
@@ -226,16 +224,15 @@ public class CacheReplicationMonitor ext
         "Must not hold the FSN write lock when waiting for a rescan.");
     Preconditions.checkArgument(lock.isHeldByCurrentThread(),
         "Must hold the CRM lock when waiting for a rescan.");
-    if (!needsRescan) {
+    if (neededScanCount <= completedScanCount) {
       return;
     }
     // If no scan is already ongoing, mark the CRM as dirty and kick
-    if (!isScanning) {
+    if (curScanCount < 0) {
       doRescan.signal();
     }
     // Wait until the scan finishes and the count advances
-    final long startCount = scanCount;
-    while ((!shutdown) && (startCount >= scanCount)) {
+    while ((!shutdown) && (completedScanCount < neededScanCount)) {
       try {
         scanFinished.await();
       } catch (InterruptedException e) {
@@ -253,7 +250,14 @@ public class CacheReplicationMonitor ext
   public void setNeedsRescan() {
     Preconditions.checkArgument(lock.isHeldByCurrentThread(),
         "Must hold the CRM lock when setting the needsRescan bit.");
-    this.needsRescan = true;
+    if (curScanCount >= 0) {
+      // If there is a scan in progress, we need to wait for the scan after
+      // that.
+      neededScanCount = curScanCount + 1;
+    } else {
+      // If there is no scan in progress, we need to wait for the next scan.
+      neededScanCount = completedScanCount + 1;
+    }
   }
 
   /**
@@ -284,10 +288,17 @@ public class CacheReplicationMonitor ext
     scannedBlocks = 0;
     namesystem.writeLock();
     try {
+      lock.lock();
       if (shutdown) {
         throw new InterruptedException("CacheReplicationMonitor was " +
             "shut down.");
       }
+      curScanCount = completedScanCount + 1;
+    }
+    finally {
+      lock.unlock();
+    }
+    try {
       resetStatistics();
       rescanCacheDirectives();
       rescanCachedBlockMap();