You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2014/02/25 07:32:01 UTC
svn commit: r1571582 -
/hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
Author: stack
Date: Tue Feb 25 06:32:01 2014
New Revision: 1571582
URL: http://svn.apache.org/r1571582
Log:
HBASE-10575 ReplicationSource thread can't be terminated if it runs into the loop to contact peer's zk ensemble and fails continuously
Modified:
hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
Modified: hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java?rev=1571582&r1=1571581&r2=1571582&view=diff
==============================================================================
--- hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (original)
+++ hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java Tue Feb 25 06:32:01 2014
@@ -191,24 +191,43 @@ public class ReplicationSource extends T
}
}
+ private void uninitialize() {
+ if (this.conn != null) {
+ try {
+ this.conn.close();
+ } catch (IOException e) {
+ LOG.debug("Attempt to close connection failed", e);
+ }
+ }
+ LOG.debug("Source exiting " + this.peerId);
+ metrics.clear();
+ }
+
@Override
public void run() {
connectToPeers();
// We were stopped while looping to connect to sinks, just abort
if (!this.isActive()) {
- metrics.clear();
+ uninitialize();
return;
}
+
int sleepMultiplier = 1;
// delay this until we are in an asynchronous thread
- while (this.peerClusterId == null) {
+ while (this.isActive() && this.peerClusterId == null) {
this.peerClusterId = replicationPeers.getPeerUUID(this.peerId);
- if (this.peerClusterId == null) {
+ if (this.isActive() && this.peerClusterId == null) {
if (sleepForRetries("Cannot contact the peer's zk ensemble", sleepMultiplier)) {
sleepMultiplier++;
}
}
}
+ // We were stopped while looping to contact peer's zk ensemble, just abort
+ if (!this.isActive()) {
+ uninitialize();
+ return;
+ }
+
// resetting to 1 to reuse later
sleepMultiplier = 1;
@@ -349,15 +368,7 @@ public class ReplicationSource extends T
sleepMultiplier = 1;
shipEdits(currentWALisBeingWrittenTo, entries);
}
- if (this.conn != null) {
- try {
- this.conn.close();
- } catch (IOException e) {
- LOG.debug("Attempt to close connection failed", e);
- }
- }
- LOG.debug("Source exiting " + this.peerId);
- metrics.clear();
+ uninitialize();
}
/**