You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2023/10/07 06:31:08 UTC
[hbase] branch branch-2.5 updated: HBASE-28129 Do not retry refreshSources when region server is already stopping (#5453)
This is an automated email from the ASF dual-hosted git repository.
zhangduo pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2.5 by this push:
new 6bf1bf66626 HBASE-28129 Do not retry refreshSources when region server is already stopping (#5453)
6bf1bf66626 is described below
commit 6bf1bf666262a7d41ab22e2adf839cf433e7f3aa
Author: Duo Zhang <zh...@apache.org>
AuthorDate: Sat Oct 7 11:35:37 2023 +0800
HBASE-28129 Do not retry refreshSources when region server is already stopping (#5453)
Signed-off-by: GeorryHuang <hu...@apache.org>
Signed-off-by: Xiaolin Ha <ha...@apache.org>
(cherry picked from commit 6455c49239a4eeb966a4f4d9afbffc9610e6d394)
---
.../regionserver/ReplicationSource.java | 50 ++++++++++++----------
1 file changed, 27 insertions(+), 23 deletions(-)
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 07f05b4ceed..99bb26da5fa 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -347,12 +347,10 @@ public class ReplicationSource implements ReplicationSourceInterface {
ReplicationSourceShipper worker = createNewShipper(walGroupId);
ReplicationSourceWALReader walReader =
createNewWALReader(walGroupId, worker.getStartPosition());
- Threads.setDaemonThreadRunning(
- walReader, Thread.currentThread().getName() + ".replicationSource.wal-reader."
- + walGroupId + "," + queueId,
- (t, e) -> this.uncaughtException(t, e, this.manager, this.getPeerId()));
+ Threads.setDaemonThreadRunning(walReader, Thread.currentThread().getName()
+ + ".replicationSource.wal-reader." + walGroupId + "," + queueId, this::retryRefreshing);
worker.setWALReader(walReader);
- worker.startup((t, e) -> this.uncaughtException(t, e, this.manager, this.getPeerId()));
+ worker.startup(this::retryRefreshing);
return worker;
}
});
@@ -425,24 +423,30 @@ public class ReplicationSource implements ReplicationSourceInterface {
return walEntryFilter;
}
- private void uncaughtException(Thread t, Throwable e, ReplicationSourceManager manager,
- String peerId) {
- RSRpcServices.exitIfOOME(e);
- LOG.error("Unexpected exception in {} currentPath={}", t.getName(), getCurrentPath(), e);
+ // log the error, check if the error is OOME, or whether we should abort the server
+ private void checkError(Thread t, Throwable error) {
+ RSRpcServices.exitIfOOME(error);
+ LOG.error("Unexpected exception in {} currentPath={}", t.getName(), getCurrentPath(), error);
if (abortOnError) {
- server.abort("Unexpected exception in " + t.getName(), e);
+ server.abort("Unexpected exception in " + t.getName(), error);
}
- if (manager != null) {
- while (true) {
- try {
- LOG.info("Refreshing replication sources now due to previous error on thread: {}",
- t.getName());
- manager.refreshSources(peerId);
- break;
- } catch (IOException e1) {
- LOG.error("Replication sources refresh failed.", e1);
- sleepForRetries("Sleeping before try refreshing sources again", maxRetriesMultiplier);
- }
+ }
+
+ private void retryRefreshing(Thread t, Throwable error) {
+ checkError(t, error);
+ while (true) {
+ if (server.isAborted() || server.isStopped() || server.isStopping()) {
+ LOG.warn("Server is shutting down, give up refreshing source for peer {}", getPeerId());
+ return;
+ }
+ try {
+ LOG.info("Refreshing replication sources now due to previous error on thread: {}",
+ t.getName());
+ manager.refreshSources(getPeerId());
+ break;
+ } catch (Exception e) {
+ LOG.error("Replication sources refresh failed.", e);
+ sleepForRetries("Sleeping before try refreshing sources again", maxRetriesMultiplier);
}
}
}
@@ -616,7 +620,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
// keep looping in this thread until initialize eventually succeeds,
// while the server main startup one can go on with its work.
sourceRunning = false;
- uncaughtException(t, e, null, null);
+ checkError(t, e);
retryStartup.set(!this.abortOnError);
do {
if (retryStartup.get()) {
@@ -627,7 +631,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
initialize();
} catch (Throwable error) {
setSourceStartupStatus(false);
- uncaughtException(t, error, null, null);
+ checkError(t, error);
retryStartup.set(!this.abortOnError);
}
}