You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by nk...@apache.org on 2014/03/04 09:50:01 UTC

svn commit: r1573942 - in /hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver: HRegion.java MemStoreFlusher.java

Author: nkeywal
Date: Tue Mar  4 08:50:01 2014
New Revision: 1573942

URL: http://svn.apache.org/r1573942
Log:
HBASE-10650 Fix incorrect handling of IE that restores current thread's interrupt status within while/for loops in RegionServer (Feng Honghua)

Modified:
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1573942&r1=1573941&r2=1573942&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Tue Mar  4 08:50:01 2014
@@ -1130,13 +1130,21 @@ public class HRegion implements HeapSize
    */
   public void waitForFlushesAndCompactions() {
     synchronized (writestate) {
-      while (writestate.compacting > 0 || writestate.flushing) {
-        LOG.debug("waiting for " + writestate.compacting + " compactions"
+      boolean interrupted = false;
+      try {
+        while (writestate.compacting > 0 || writestate.flushing) {
+          LOG.debug("waiting for " + writestate.compacting + " compactions"
             + (writestate.flushing ? " & cache flush" : "") + " to complete for region " + this);
-        try {
-          writestate.wait();
-        } catch (InterruptedException iex) {
-          // essentially ignore and propagate the interrupt back up
+          try {
+            writestate.wait();
+          } catch (InterruptedException iex) {
+            // essentially ignore and propagate the interrupt back up
+            LOG.warn("Interrupted while waiting");
+            interrupted = true;
+          }
+        }
+      } finally {
+        if (interrupted) {
           Thread.currentThread().interrupt();
         }
       }

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java?rev=1573942&r1=1573941&r2=1573942&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java Tue Mar  4 08:50:01 2014
@@ -548,27 +548,36 @@ class MemStoreFlusher implements FlushRe
       synchronized (this.blockSignal) {
         boolean blocked = false;
         long startTime = 0;
-        while (isAboveHighWaterMark() && !server.isStopped()) {
-          if (!blocked) {
-            startTime = EnvironmentEdgeManager.currentTimeMillis();
-            LOG.info("Blocking updates on " + server.toString() +
-            ": the global memstore size " +
-            StringUtils.humanReadableInt(server.getRegionServerAccounting().getGlobalMemstoreSize()) +
-            " is >= than blocking " +
-            StringUtils.humanReadableInt(globalMemStoreLimit) + " size");
+        boolean interrupted = false;
+        try {
+          while (isAboveHighWaterMark() && !server.isStopped()) {
+            if (!blocked) {
+              startTime = EnvironmentEdgeManager.currentTimeMillis();
+              LOG.info("Blocking updates on " + server.toString() +
+                ": the global memstore size " +
+                StringUtils.humanReadableInt(server.getRegionServerAccounting().getGlobalMemstoreSize()) +
+                " is >= than blocking " +
+                StringUtils.humanReadableInt(globalMemStoreLimit) + " size");
+            }
+            blocked = true;
+            wakeupFlushThread();
+            try {
+              // we should be able to wait forever, but we've seen a bug where
+              // we miss a notify, so put a 5 second bound on it at least.
+              blockSignal.wait(5 * 1000);
+            } catch (InterruptedException ie) {
+              LOG.warn("Interrupted while waiting");
+              interrupted = true;
+            }
+            long took = System.currentTimeMillis() - start;
+            LOG.warn("Memstore is above high water mark and block " + took + "ms");
           }
-          blocked = true;
-          wakeupFlushThread();
-          try {
-            // we should be able to wait forever, but we've seen a bug where
-            // we miss a notify, so put a 5 second bound on it at least.
-            blockSignal.wait(5 * 1000);
-          } catch (InterruptedException ie) {
+        } finally {
+          if (interrupted) {
             Thread.currentThread().interrupt();
           }
-          long took = System.currentTimeMillis() - start;
-          LOG.warn("Memstore is above high water mark and block " + took + "ms");
         }
+
         if(blocked){
           final long totalTime = EnvironmentEdgeManager.currentTimeMillis() - startTime;
           if(totalTime > 0){