You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by wc...@apache.org on 2023/06/05 20:34:01 UTC
[hbase] branch branch-2 updated: HBASE-27871 Meta replication stuck forever if wal it's still reading gets rolled and deleted (#5241)
This is an automated email from the ASF dual-hosted git repository.
wchevreuil pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2 by this push:
new d43dfcce902 HBASE-27871 Meta replication stuck forever if wal it's still reading gets rolled and deleted (#5241)
d43dfcce902 is described below
commit d43dfcce902da5c999f46f1122f01d03cdd8e5b3
Author: Wellington Ramos Chevreuil <wc...@apache.org>
AuthorDate: Mon Jun 5 21:33:48 2023 +0100
HBASE-27871 Meta replication stuck forever if wal it's still reading gets rolled and deleted (#5241)
Signed-off-by: Ankit Singhal <an...@apache.org>
Signed-off-by: Duo Zhang <zh...@apache.org>
Reviewed by: Andor Molnár <an...@cloudera.com>
Reviewed by: Shanmukha Kota <sk...@cloudera.com>
---
.../regionserver/ReplicationSourceWALReader.java | 6 +++
.../replication/regionserver/WALEntryStream.java | 6 +++
.../TestMetaRegionReplicaReplicationEndpoint.java | 48 ++++++++++++++++++++++
3 files changed, 60 insertions(+)
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
index bd5b7736f3b..26360cbe3ea 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
@@ -23,6 +23,7 @@ import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -75,6 +76,8 @@ class ReplicationSourceWALReader extends Thread {
private boolean isReaderRunning = true;
private final String walGroupId;
+ AtomicBoolean waitingPeerEnabled = new AtomicBoolean(false);
+
/**
* Creates a reader worker for a given WAL queue. Reads WAL entries off a given queue, batches the
* entries, and puts them on a batch queue.
@@ -137,8 +140,11 @@ class ReplicationSourceWALReader extends Thread {
source.getWALFileLengthProvider(), source.getSourceMetrics(), walGroupId)) {
while (isReaderRunning()) { // loop here to keep reusing stream while we can
if (!source.isPeerEnabled()) {
+ waitingPeerEnabled.set(true);
Threads.sleep(sleepForRetries);
continue;
+ } else {
+ waitingPeerEnabled.set(false);
}
if (!checkBufferQuota()) {
continue;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
index c6268674c5b..22bf05b3741 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.replication.regionserver;
import java.io.Closeable;
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.io.UncheckedIOException;
import java.util.OptionalLong;
import java.util.concurrent.PriorityBlockingQueue;
import org.apache.hadoop.conf.Configuration;
@@ -219,6 +220,11 @@ class WALEntryStream implements Closeable {
// we will read from the beginning so we should always clear the compression context
reader.resetTo(-1, true);
}
+ } catch (FileNotFoundException e) {
+ // For now, this could happen only when reading meta wal for meta replicas.
+ // In this case, raising UncheckedIOException will let the endpoint deal with resetting
+ // the replication source. See HBASE-27871.
+ throw new UncheckedIOException(e);
} catch (IOException e) {
LOG.warn("Failed to reset reader {} to pos {}, reset compression={}", currentPath,
currentPositionOfEntry, state.resetCompression(), e);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetaRegionReplicaReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetaRegionReplicaReplicationEndpoint.java
index 7ca651dbe77..d3e644ff433 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetaRegionReplicaReplicationEndpoint.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetaRegionReplicaReplicationEndpoint.java
@@ -29,6 +29,8 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.mutable.MutableObject;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
@@ -53,6 +55,7 @@ import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.replication.ReplicationPeerImpl;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
@@ -225,6 +228,51 @@ public class TestMetaRegionReplicaReplicationEndpoint {
}
}
+ @Test
+ public void testCatalogReplicaReplicationWALRolledAndDeleted() throws Exception {
+ TableName tableName = TableName.valueOf("hbase:meta");
+ try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
+ Table table = connection.getTable(tableName)) {
+ MiniHBaseCluster cluster = HTU.getHBaseCluster();
+ cluster.getMaster().balanceSwitch(false);
+ HRegionServer hrs = cluster.getRegionServer(cluster.getServerHoldingMeta());
+ ReplicationSource source = (ReplicationSource) hrs.getReplicationSourceService()
+ .getReplicationManager().catalogReplicationSource.get();
+ ((ReplicationPeerImpl) source.replicationPeer).setPeerState(false);
+ // there's small chance source reader has passed the peer state check but not yet read the
+ // wal, which could allow it to read some added entries before the wal gets deleted,
+ // so we are making sure here we only proceed once the reader loop has managed to
+ // detect the peer is disabled.
+ HTU.waitFor(2000, 100, true, () -> {
+ MutableObject<Boolean> readerWaiting = new MutableObject<>(true);
+ source.logQueue.getQueues().keySet()
+ .forEach(w -> readerWaiting.setValue(readerWaiting.getValue()
+ && source.workerThreads.get(w).entryReader.waitingPeerEnabled.get()));
+ return readerWaiting.getValue();
+ });
+ // load the data to the table
+ for (int i = 0; i < 5; i++) {
+ LOG.info("Writing data from " + i * 1000 + " to " + (i * 1000 + 1000));
+ HTU.loadNumericRows(table, HConstants.CATALOG_FAMILY, i * 1000, i * 1000 + 1000);
+ LOG.info("flushing table");
+ HTU.flush(tableName);
+ LOG.info("compacting table");
+ if (i < 4) {
+ HTU.compact(tableName, false);
+ }
+ }
+ HTU.getHBaseCluster().getMaster().getLogCleaner().triggerCleanerNow().get(1,
+ TimeUnit.SECONDS);
+ ((ReplicationPeerImpl) source.replicationPeer).setPeerState(true);
+ // now loads more data without flushing nor compacting
+ for (int i = 5; i < 10; i++) {
+ LOG.info("Writing data from " + i * 1000 + " to " + (i * 1000 + 1000));
+ HTU.loadNumericRows(table, HConstants.CATALOG_FAMILY, i * 1000, i * 1000 + 1000);
+ }
+ verifyReplication(tableName, numOfMetaReplica, 0, 10000, HConstants.CATALOG_FAMILY);
+ }
+ }
+
@Test
public void testCatalogReplicaReplicationWithReplicaMoved() throws Exception {
MiniHBaseCluster cluster = HTU.getMiniHBaseCluster();