You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2014/03/08 21:41:32 UTC
svn commit: r1575605 -
/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
Author: stack
Date: Sat Mar 8 20:41:32 2014
New Revision: 1575605
URL: http://svn.apache.org/r1575605
Log:
HBASE-10651 Fix incorrect handling of IE that restores current thread's interrupt status within while/for loops in Replication (Honghua Feng)
Modified:
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java?rev=1575605&r1=1575604&r2=1575605&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java Sat Mar 8 20:41:32 2014
@@ -710,6 +710,9 @@ public class ReplicationSource extends T
} catch (InterruptedException e) {
LOG.debug("Interrupted while sleeping for throttling control");
Thread.currentThread().interrupt();
+ // current thread might be interrupted to terminate
+ // directly go back to while() for confirm this
+ continue;
}
// reset throttler's cycle start tick when sleep for throttling occurs
this.throttler.resetStartTick();
@@ -753,6 +756,11 @@ public class ReplicationSource extends T
+ "Replication cannot proceed without losing data.", sleepMultiplier)) {
sleepMultiplier++;
}
+ // current thread might be interrupted to terminate
+ // directly go back to while() for confirm this
+ if (isInterrupted()) {
+ continue;
+ }
}
} else {
if (ioe instanceof SocketTimeoutException) {
@@ -763,6 +771,11 @@ public class ReplicationSource extends T
"call to the remote cluster timed out, which is usually " +
"caused by a machine failure or a massive slowdown",
this.socketTimeoutMultiplier);
+ // current thread might be interrupted to terminate
+ // directly go back to while() for confirm this
+ if (isInterrupted()) {
+ continue;
+ }
} else if (ioe instanceof ConnectException) {
LOG.warn("Peer is unavailable, rechecking all sinks: ", ioe);
replicationSinkMgr.chooseSinks();
@@ -849,7 +862,8 @@ public class ReplicationSource extends T
+ " because an error occurred: " + reason, cause);
}
this.running = false;
- Threads.shutdown(this, this.sleepForRetries);
+ this.interrupt();
+ Threads.shutdown(this, this.sleepForRetries * this.maxRetriesMultiplier);
}
public String getPeerClusterZnode() {
@@ -865,7 +879,7 @@ public class ReplicationSource extends T
}
private boolean isActive() {
- return !this.stopper.isStopped() && this.running;
+ return !this.stopper.isStopped() && this.running && !isInterrupted();
}
/**