You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by su...@apache.org on 2021/02/20 02:27:25 UTC
[hbase] branch branch-2 updated: HBASE-25562
ReplicationSourceWALReader log and handle exception immediately without
retrying (#2943)
This is an automated email from the ASF dual-hosted git repository.
sunxin pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2 by this push:
new a3edcc2 HBASE-25562 ReplicationSourceWALReader log and handle exception immediately without retrying (#2943)
a3edcc2 is described below
commit a3edcc285481357446ed02effbb83a4142402c8f
Author: XinSun <dd...@gmail.com>
AuthorDate: Sat Feb 20 10:20:54 2021 +0800
HBASE-25562 ReplicationSourceWALReader log and handle exception immediately without retrying (#2943)
Signed-off-by: Wellington Chevreuil <wc...@apache.org>
Signed-off-by: stack <st...@apache.org>
Signed-off-by: shahrs87
---
.../regionserver/ReplicationSourceWALReader.java | 30 ++++++++++++----------
1 file changed, 17 insertions(+), 13 deletions(-)
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
index 52ac144..f52a83a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
@@ -150,14 +150,13 @@ class ReplicationSourceWALReader extends Thread {
}
}
} catch (IOException e) { // stream related
- if (sleepMultiplier < maxRetriesMultiplier) {
- LOG.debug("Failed to read stream of replication entries: " + e);
- sleepMultiplier++;
- } else {
- LOG.error("Failed to read stream of replication entries", e);
- handleEofException(e);
+ if (!handleEofException(e)) {
+ LOG.warn("Failed to read stream of replication entries", e);
+ if (sleepMultiplier < maxRetriesMultiplier) {
+ sleepMultiplier ++;
+ }
+ Threads.sleep(sleepForRetries * sleepMultiplier);
}
- Threads.sleep(sleepForRetries * sleepMultiplier);
} catch (InterruptedException e) {
LOG.trace("Interrupted while sleeping between WAL reads");
Thread.currentThread().interrupt();
@@ -244,10 +243,13 @@ class ReplicationSourceWALReader extends Thread {
}
}
- // if we get an EOF due to a zero-length log, and there are other logs in queue
- // (highly likely we've closed the current log), we've hit the max retries, and autorecovery is
- // enabled, then dump the log
- private void handleEofException(IOException e) {
+ /**
+ * if we get an EOF due to a zero-length log, and there are other logs in queue
+ * (highly likely we've closed the current log), and autorecovery is
+ * enabled, then dump the log
+ * @return true only the IOE can be handled
+ */
+ private boolean handleEofException(IOException e) {
PriorityBlockingQueue<Path> queue = logQueue.getQueue(walGroupId);
// Dump the log even if logQueue size is 1 if the source is from recovered Source
// since we don't add current log to recovered source queue so it is safe to remove.
@@ -255,14 +257,16 @@ class ReplicationSourceWALReader extends Thread {
(source.isRecovered() || queue.size() > 1) && this.eofAutoRecovery) {
try {
if (fs.getFileStatus(queue.peek()).getLen() == 0) {
- LOG.warn("Forcing removal of 0 length log in queue: " + queue.peek());
+ LOG.warn("Forcing removal of 0 length log in queue: {}", queue.peek());
logQueue.remove(walGroupId);
currentPosition = 0;
+ return true;
}
} catch (IOException ioe) {
- LOG.warn("Couldn't get file length information about log " + queue.peek());
+ LOG.warn("Couldn't get file length information about log {}", queue.peek());
}
}
+ return false;
}
public Path getCurrentPath() {