You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2014/02/25 07:32:01 UTC

svn commit: r1571582 - /hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java

Author: stack
Date: Tue Feb 25 06:32:01 2014
New Revision: 1571582

URL: http://svn.apache.org/r1571582
Log:
HBASE-10575 ReplicationSource thread can't be terminated if it runs into the loop to contact peer's zk ensemble and fails continuously

Modified:
    hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java

Modified: hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java?rev=1571582&r1=1571581&r2=1571582&view=diff
==============================================================================
--- hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (original)
+++ hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java Tue Feb 25 06:32:01 2014
@@ -191,24 +191,43 @@ public class ReplicationSource extends T
     }
   }
 
+  private void uninitialize() {
+    if (this.conn != null) {
+      try {
+        this.conn.close();
+      } catch (IOException e) {
+        LOG.debug("Attempt to close connection failed", e);
+      }
+    }
+    LOG.debug("Source exiting " + this.peerId);
+    metrics.clear();
+  }
+
   @Override
   public void run() {
     connectToPeers();
     // We were stopped while looping to connect to sinks, just abort
     if (!this.isActive()) {
-      metrics.clear();
+      uninitialize();
       return;
     }
+
     int sleepMultiplier = 1;
     // delay this until we are in an asynchronous thread
-    while (this.peerClusterId == null) {
+    while (this.isActive() && this.peerClusterId == null) {
       this.peerClusterId = replicationPeers.getPeerUUID(this.peerId);
-      if (this.peerClusterId == null) {
+      if (this.isActive() && this.peerClusterId == null) {
         if (sleepForRetries("Cannot contact the peer's zk ensemble", sleepMultiplier)) {
           sleepMultiplier++;
         }
       }
     }
+    // We were stopped while looping to contact peer's zk ensemble, just abort
+    if (!this.isActive()) {
+      uninitialize();
+      return;
+    }
+
     // resetting to 1 to reuse later
     sleepMultiplier = 1;
 
@@ -349,15 +368,7 @@ public class ReplicationSource extends T
       sleepMultiplier = 1;
       shipEdits(currentWALisBeingWrittenTo, entries);
     }
-    if (this.conn != null) {
-      try {
-        this.conn.close();
-      } catch (IOException e) {
-        LOG.debug("Attempt to close connection failed", e);
-      }
-    }
-    LOG.debug("Source exiting " + this.peerId);
-    metrics.clear();
+    uninitialize();
   }
 
   /**