You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2023/10/07 06:31:08 UTC

[hbase] branch branch-2.5 updated: HBASE-28129 Do not retry refreshSources when region server is already stopping (#5453)

This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2.5 by this push:
     new 6bf1bf66626 HBASE-28129 Do not retry refreshSources when region server is already stopping (#5453)
6bf1bf66626 is described below

commit 6bf1bf666262a7d41ab22e2adf839cf433e7f3aa
Author: Duo Zhang <zh...@apache.org>
AuthorDate: Sat Oct 7 11:35:37 2023 +0800

    HBASE-28129 Do not retry refreshSources when region server is already stopping (#5453)
    
    Signed-off-by: GeorryHuang <hu...@apache.org>
    Signed-off-by: Xiaolin Ha <ha...@apache.org>
    (cherry picked from commit 6455c49239a4eeb966a4f4d9afbffc9610e6d394)
---
 .../regionserver/ReplicationSource.java            | 50 ++++++++++++----------
 1 file changed, 27 insertions(+), 23 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 07f05b4ceed..99bb26da5fa 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -347,12 +347,10 @@ public class ReplicationSource implements ReplicationSourceInterface {
         ReplicationSourceShipper worker = createNewShipper(walGroupId);
         ReplicationSourceWALReader walReader =
           createNewWALReader(walGroupId, worker.getStartPosition());
-        Threads.setDaemonThreadRunning(
-          walReader, Thread.currentThread().getName() + ".replicationSource.wal-reader."
-            + walGroupId + "," + queueId,
-          (t, e) -> this.uncaughtException(t, e, this.manager, this.getPeerId()));
+        Threads.setDaemonThreadRunning(walReader, Thread.currentThread().getName()
+          + ".replicationSource.wal-reader." + walGroupId + "," + queueId, this::retryRefreshing);
         worker.setWALReader(walReader);
-        worker.startup((t, e) -> this.uncaughtException(t, e, this.manager, this.getPeerId()));
+        worker.startup(this::retryRefreshing);
         return worker;
       }
     });
@@ -425,24 +423,30 @@ public class ReplicationSource implements ReplicationSourceInterface {
     return walEntryFilter;
   }
 
-  private void uncaughtException(Thread t, Throwable e, ReplicationSourceManager manager,
-    String peerId) {
-    RSRpcServices.exitIfOOME(e);
-    LOG.error("Unexpected exception in {} currentPath={}", t.getName(), getCurrentPath(), e);
+  // log the error, check if the error is OOME, or whether we should abort the server
+  private void checkError(Thread t, Throwable error) {
+    RSRpcServices.exitIfOOME(error);
+    LOG.error("Unexpected exception in {} currentPath={}", t.getName(), getCurrentPath(), error);
     if (abortOnError) {
-      server.abort("Unexpected exception in " + t.getName(), e);
+      server.abort("Unexpected exception in " + t.getName(), error);
     }
-    if (manager != null) {
-      while (true) {
-        try {
-          LOG.info("Refreshing replication sources now due to previous error on thread: {}",
-            t.getName());
-          manager.refreshSources(peerId);
-          break;
-        } catch (IOException e1) {
-          LOG.error("Replication sources refresh failed.", e1);
-          sleepForRetries("Sleeping before try refreshing sources again", maxRetriesMultiplier);
-        }
+  }
+
+  private void retryRefreshing(Thread t, Throwable error) {
+    checkError(t, error);
+    while (true) {
+      if (server.isAborted() || server.isStopped() || server.isStopping()) {
+        LOG.warn("Server is shutting down, give up refreshing source for peer {}", getPeerId());
+        return;
+      }
+      try {
+        LOG.info("Refreshing replication sources now due to previous error on thread: {}",
+          t.getName());
+        manager.refreshSources(getPeerId());
+        break;
+      } catch (Exception e) {
+        LOG.error("Replication sources refresh failed.", e);
+        sleepForRetries("Sleeping before try refreshing sources again", maxRetriesMultiplier);
       }
     }
   }
@@ -616,7 +620,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
         // keep looping in this thread until initialize eventually succeeds,
         // while the server main startup one can go on with its work.
         sourceRunning = false;
-        uncaughtException(t, e, null, null);
+        checkError(t, e);
         retryStartup.set(!this.abortOnError);
         do {
           if (retryStartup.get()) {
@@ -627,7 +631,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
               initialize();
             } catch (Throwable error) {
               setSourceStartupStatus(false);
-              uncaughtException(t, error, null, null);
+              checkError(t, error);
               retryStartup.set(!this.abortOnError);
             }
           }