You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@hbase.apache.org by GitBox <gi...@apache.org> on 2021/01/11 14:50:36 UTC

[GitHub] [hbase] wchevreuil commented on a change in pull request #2849: HBASE-24813 ReplicationSource should clear buffer usage on Replicatio…

wchevreuil commented on a change in pull request #2849:
URL: https://github.com/apache/hbase/pull/2849#discussion_r555099650



##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
##########
@@ -330,4 +333,56 @@ public boolean sleepForRetries(String msg, int sleepMultiplier) {
     }
     return sleepMultiplier < maxRetriesMultiplier;
   }
+
+  /**
+   * Attempts to properly update <code>ReplicationSourceManager.totalBufferUser</code>,
+   * in case there were unprocessed entries batched by the reader to the shipper,
+   * but the shipper didn't manage to ship those because the replication source is being terminated.
+   * In that case, it iterates through the batched entries and decrease the pending
+   * entries size from <code>ReplicationSourceManager.totalBufferUser</code>
+   * <p/>
+   * <b>NOTES</b>
+   * 1) This method should only be called upon replication source termination.
+   * It blocks waiting for both shipper and reader threads termination,
+   * to make sure no race conditions
+   * when updating <code>ReplicationSourceManager.totalBufferUser</code>.
+   *
+   * 2) It <b>does not</b> attempt to terminate reader and shipper threads. Those <b>must</b>
+   * have been triggered interruption/termination prior to calling this method.
+   */
+  void clearWALEntryBatch() {
+    long timeout = System.currentTimeMillis() + this.shipEditsTimeout;
+    while(this.isAlive() || this.entryReader.isAlive()){
+      try {
+        if (System.currentTimeMillis() >= timeout) {
+          LOG.warn("Shipper clearWALEntryBatch method timed out whilst waiting reader/shipper "
+            + "thread to stop. Not cleaning buffer usage. Shipper alive: {}; Reader alive: {}",
+            this.source.getPeerId(), this.isAlive(), this.entryReader.isAlive());
+          return;
+        } else {
+          // Wait both shipper and reader threads to stop
+          Thread.sleep(this.sleepForRetries);
+        }
+      } catch (InterruptedException e) {
+        LOG.warn("{} Interrupted while waiting {} to stop on clearWALEntryBatch. "
+            + "Not cleaning buffer usage: {}", this.source.getPeerId(), this.getName(), e);
+        return;
+      }
+    }
+    LongAccumulator totalToDecrement = new LongAccumulator((a,b) -> a + b, 0);
+    entryReader.entryBatchQueue.forEach(w -> {
+      entryReader.entryBatchQueue.remove(w);

Review comment:
       My interpretation from _BlockingQueue_ javadoc is that it is: 
   
   `BlockingQueue implementations are thread-safe. All queuing methods achieve their effects atomically using internal locks or other forms of concurrency control.`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org