You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@hbase.apache.org by GitBox <gi...@apache.org> on 2020/09/04 04:30:17 UTC

[GitHub] [hbase] busbey commented on a change in pull request #2191: HBASE-24813 ReplicationSource should clear buffer usage on Replicatio…

busbey commented on a change in pull request #2191:
URL: https://github.com/apache/hbase/pull/2191#discussion_r483374802



##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
##########
@@ -626,14 +626,10 @@ public void terminate(String reason, Exception cause, boolean clearMetrics, bool
       Threads.shutdown(initThread, this.sleepForRetries);
     }
     Collection<ReplicationSourceShipper> workers = workerThreads.values();
-    for (ReplicationSourceShipper worker : workers) {
-      worker.stopWorker();
-      if(worker.entryReader != null) {
-        worker.entryReader.setReaderRunning(false);
-      }
-    }
 
     for (ReplicationSourceShipper worker : workers) {
+      worker.stopWorker();
+      worker.entryReader.setReaderRunning(false);

Review comment:
       we don't need the null check still?

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
##########
@@ -324,4 +326,39 @@ void stopWorker() {
   public boolean isFinished() {
     return state == WorkerState.FINISHED;
   }
+
+  /**
+   * Attempts to properly update <code>ReplicationSourceManager.totalBufferUser</code>,
+   * in case there were unprocessed entries batched by the reader to the shipper,
+   * but the shipper didn't manage to ship those because the replication source is being terminated.
+   * In that case, it iterates through the batched entries and decrease the pending
+   * entries size from <code>ReplicationSourceManager.totalBufferUser</code>
+   * <p/>
+   * <b>NOTE</b> This method should be only called upon replication source termination.
+   * It blocks waiting for both shipper and reader threads termination,
+   * to make sure no race conditions
+   * when updating <code>ReplicationSourceManager.totalBufferUser</code>.
+   */
+  void clearWALEntryBatch() {
+    while(this.isAlive() || this.entryReader.isAlive()){
+      try {
+        // Wait both shipper and reader threads to stop
+        Thread.sleep(this.sleepForRetries);
+      } catch (InterruptedException e) {
+        LOG.info("{} Interrupted while waiting {} to stop on clearWALEntryBatch",
+          this.source.getPeerId(), this.getName());
+        Thread.currentThread().interrupt();
+      }
+    }

Review comment:
       should we have a timeout here? or is there a timeout above us that will interrupt if we take too long?

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
##########
@@ -324,4 +326,39 @@ void stopWorker() {
   public boolean isFinished() {
     return state == WorkerState.FINISHED;
   }
+
+  /**
+   * Attempts to properly update <code>ReplicationSourceManager.totalBufferUser</code>,
+   * in case there were unprocessed entries batched by the reader to the shipper,
+   * but the shipper didn't manage to ship those because the replication source is being terminated.
+   * In that case, it iterates through the batched entries and decrease the pending
+   * entries size from <code>ReplicationSourceManager.totalBufferUser</code>
+   * <p/>
+   * <b>NOTE</b> This method should be only called upon replication source termination.
+   * It blocks waiting for both shipper and reader threads termination,
+   * to make sure no race conditions
+   * when updating <code>ReplicationSourceManager.totalBufferUser</code>.

Review comment:
       expressly note that both the worker and the entry reader should have already been interrupted because we're not doing it here.

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
##########
@@ -651,6 +647,20 @@ public void terminate(String reason, Exception cause, boolean clearMetrics, bool
           worker.entryReader.interrupt();
         }
       }
+      //block this thread until worker thread is interrupted
+      while(worker.isAlive()){
+        try {
+          // Wait worker to stop
+          Thread.sleep(this.sleepForRetries);
+        } catch (InterruptedException e) {
+          LOG.info("{} Interrupted while waiting {} to stop", logPeerId(), worker.getName());
+          Thread.currentThread().interrupt();
+        }
+      }
+      //If worker is already stopped but there was still entries batched,
+      //we need to clear buffer used for non processed entries
+      worker.clearWALEntryBatch();

Review comment:
       given that `clearWALEntryBatch` will wait for the worker to not be alive, why are we waiting for it here as well?

##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
##########
@@ -324,4 +326,39 @@ void stopWorker() {
   public boolean isFinished() {
     return state == WorkerState.FINISHED;
   }
+
+  /**
+   * Attempts to properly update <code>ReplicationSourceManager.totalBufferUser</code>,
+   * in case there were unprocessed entries batched by the reader to the shipper,
+   * but the shipper didn't manage to ship those because the replication source is being terminated.
+   * In that case, it iterates through the batched entries and decrease the pending
+   * entries size from <code>ReplicationSourceManager.totalBufferUser</code>
+   * <p/>
+   * <b>NOTE</b> This method should be only called upon replication source termination.
+   * It blocks waiting for both shipper and reader threads termination,
+   * to make sure no race conditions
+   * when updating <code>ReplicationSourceManager.totalBufferUser</code>.
+   */
+  void clearWALEntryBatch() {
+    while(this.isAlive() || this.entryReader.isAlive()){
+      try {
+        // Wait both shipper and reader threads to stop
+        Thread.sleep(this.sleepForRetries);
+      } catch (InterruptedException e) {
+        LOG.info("{} Interrupted while waiting {} to stop on clearWALEntryBatch",

Review comment:
       is info the right level here? maybe it is? but if we get interrupted that means we could go to do the update below in a racy way with the other threads right?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org