You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2014/03/08 21:41:32 UTC

svn commit: r1575605 - /hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java

Author: stack
Date: Sat Mar  8 20:41:32 2014
New Revision: 1575605

URL: http://svn.apache.org/r1575605
Log:
HBASE-10651 Fix incorrect handling of IE that restores current thread's interrupt status within while/for loops in Replication (Honghua Feng)

Modified:
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java?rev=1575605&r1=1575604&r2=1575605&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java Sat Mar  8 20:41:32 2014
@@ -710,6 +710,9 @@ public class ReplicationSource extends T
             } catch (InterruptedException e) {
               LOG.debug("Interrupted while sleeping for throttling control");
               Thread.currentThread().interrupt();
+              // current thread might be interrupted to terminate
+              // directly go back to while() for confirm this
+              continue;
             }
             // reset throttler's cycle start tick when sleep for throttling occurs
             this.throttler.resetStartTick();
@@ -753,6 +756,11 @@ public class ReplicationSource extends T
                 + "Replication cannot proceed without losing data.", sleepMultiplier)) {
               sleepMultiplier++;
             }
+            // current thread might be interrupted to terminate
+            // directly go back to while() for confirm this
+            if (isInterrupted()) {
+              continue;
+            }
           }
         } else {
           if (ioe instanceof SocketTimeoutException) {
@@ -763,6 +771,11 @@ public class ReplicationSource extends T
               "call to the remote cluster timed out, which is usually " +
               "caused by a machine failure or a massive slowdown",
               this.socketTimeoutMultiplier);
+            // current thread might be interrupted to terminate
+            // directly go back to while() for confirm this
+            if (isInterrupted()) {
+              continue;
+            }
           } else if (ioe instanceof ConnectException) {
             LOG.warn("Peer is unavailable, rechecking all sinks: ", ioe);
             replicationSinkMgr.chooseSinks();
@@ -849,7 +862,8 @@ public class ReplicationSource extends T
           + " because an error occurred: " + reason, cause);
     }
     this.running = false;
-    Threads.shutdown(this, this.sleepForRetries);
+    this.interrupt();
+    Threads.shutdown(this, this.sleepForRetries * this.maxRetriesMultiplier);
   }
 
   public String getPeerClusterZnode() {
@@ -865,7 +879,7 @@ public class ReplicationSource extends T
   }
 
   private boolean isActive() {
-    return !this.stopper.isStopped() && this.running;
+    return !this.stopper.isStopped() && this.running && !isInterrupted();
   }
 
   /**