You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by wc...@apache.org on 2023/06/15 20:38:48 UTC
[hbase] branch branch-2.5 updated: HBASE-27871 Meta replication stuck forever if wal it's still reading gets rolled and deleted (#5271)
This is an automated email from the ASF dual-hosted git repository.
wchevreuil pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2.5 by this push:
new 91627ce8e09 HBASE-27871 Meta replication stuck forever if wal it's still reading gets rolled and deleted (#5271)
91627ce8e09 is described below
commit 91627ce8e09be25c4fe47f115d4ca15e6cbe567e
Author: Wellington Ramos Chevreuil <wc...@apache.org>
AuthorDate: Thu Jun 15 21:38:39 2023 +0100
HBASE-27871 Meta replication stuck forever if wal it's still reading gets rolled and deleted (#5271)
Signed-off-by: Peter Somogyi <ps...@apache.org>
---
.../regionserver/ReplicationSourceWALReader.java | 6 +++
.../replication/regionserver/WALEntryStream.java | 6 ++-
.../TestMetaRegionReplicaReplicationEndpoint.java | 46 ++++++++++++++++++++++
3 files changed, 57 insertions(+), 1 deletion(-)
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
index 256f9a78624..e6718c06829 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
@@ -24,6 +24,7 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -79,6 +80,8 @@ class ReplicationSourceWALReader extends Thread {
private long totalBufferQuota;
private final String walGroupId;
+ AtomicBoolean waitingPeerEnabled = new AtomicBoolean(false);
+
/**
* Creates a reader worker for a given WAL queue. Reads WAL entries off a given queue, batches the
* entries, and puts them on a batch queue.
@@ -130,8 +133,11 @@ class ReplicationSourceWALReader extends Thread {
while (isReaderRunning()) { // loop here to keep reusing stream while we can
batch = null;
if (!source.isPeerEnabled()) {
+ waitingPeerEnabled.set(true);
Threads.sleep(sleepForRetries);
continue;
+ } else {
+ waitingPeerEnabled.set(false);
}
if (!checkQuota()) {
continue;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
index aa059aa30a2..cbaf3c6f6e9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.replication.regionserver;
import java.io.Closeable;
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.io.UncheckedIOException;
import java.util.OptionalLong;
import java.util.concurrent.PriorityBlockingQueue;
import org.apache.hadoop.conf.Configuration;
@@ -386,7 +387,10 @@ class WALEntryStream implements Closeable {
if (archivedLog != null) {
openReader(archivedLog);
} else {
- throw fnfe;
+ // For now, this could happen only when reading meta wal for meta replicas.
+ // In this case, raising UncheckedIOException will let the endpoint deal with resetting
+ // the replication source. See HBASE-27871.
+ throw new UncheckedIOException(fnfe);
}
} catch (NullPointerException npe) {
throw new IOException("NPE resetting reader, likely HDFS-4380", npe);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetaRegionReplicaReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetaRegionReplicaReplicationEndpoint.java
index 7ca651dbe77..7196c2827ca 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetaRegionReplicaReplicationEndpoint.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetaRegionReplicaReplicationEndpoint.java
@@ -29,6 +29,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
+import org.apache.commons.lang3.mutable.MutableObject;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
@@ -53,6 +54,7 @@ import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.replication.ReplicationPeerImpl;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
@@ -225,6 +227,50 @@ public class TestMetaRegionReplicaReplicationEndpoint {
}
}
+ @Test
+ public void testCatalogReplicaReplicationWALRolledAndDeleted() throws Exception {
+ TableName tableName = TableName.valueOf("hbase:meta");
+ try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
+ Table table = connection.getTable(tableName)) {
+ MiniHBaseCluster cluster = HTU.getHBaseCluster();
+ cluster.getMaster().balanceSwitch(false);
+ HRegionServer hrs = cluster.getRegionServer(cluster.getServerHoldingMeta());
+ ReplicationSource source = (ReplicationSource) hrs.getReplicationSourceService()
+ .getReplicationManager().catalogReplicationSource.get();
+ ((ReplicationPeerImpl) source.replicationPeer).setPeerState(false);
+ // there's small chance source reader has passed the peer state check but not yet read the
+ // wal, which could allow it to read some added entries before the wal gets deleted,
+ // so we are making sure here we only proceed once the reader loop has managed to
+ // detect the peer is disabled.
+ HTU.waitFor(2000, 100, true, () -> {
+ MutableObject<Boolean> readerWaiting = new MutableObject<>(true);
+ source.logQueue.getQueues().keySet()
+ .forEach(w -> readerWaiting.setValue(readerWaiting.getValue()
+ && source.workerThreads.get(w).entryReader.waitingPeerEnabled.get()));
+ return readerWaiting.getValue();
+ });
+ // load the data to the table
+ for (int i = 0; i < 5; i++) {
+ LOG.info("Writing data from " + i * 1000 + " to " + (i * 1000 + 1000));
+ HTU.loadNumericRows(table, HConstants.CATALOG_FAMILY, i * 1000, i * 1000 + 1000);
+ LOG.info("flushing table");
+ HTU.flush(tableName);
+ LOG.info("compacting table");
+ if (i < 4) {
+ HTU.compact(tableName, false);
+ }
+ }
+ HTU.getHBaseCluster().getMaster().getLogCleaner().runCleaner();
+ ((ReplicationPeerImpl) source.replicationPeer).setPeerState(true);
+ // now loads more data without flushing nor compacting
+ for (int i = 5; i < 10; i++) {
+ LOG.info("Writing data from " + i * 1000 + " to " + (i * 1000 + 1000));
+ HTU.loadNumericRows(table, HConstants.CATALOG_FAMILY, i * 1000, i * 1000 + 1000);
+ }
+ verifyReplication(tableName, numOfMetaReplica, 0, 10000, HConstants.CATALOG_FAMILY);
+ }
+ }
+
@Test
public void testCatalogReplicaReplicationWithReplicaMoved() throws Exception {
MiniHBaseCluster cluster = HTU.getMiniHBaseCluster();