You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by wc...@apache.org on 2023/06/15 20:38:48 UTC

[hbase] branch branch-2.5 updated: HBASE-27871 Meta replication stuck forever if wal it's still reading gets rolled and deleted (#5271)

This is an automated email from the ASF dual-hosted git repository.

wchevreuil pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2.5 by this push:
     new 91627ce8e09 HBASE-27871 Meta replication stuck forever if wal it's still reading gets rolled and deleted (#5271)
91627ce8e09 is described below

commit 91627ce8e09be25c4fe47f115d4ca15e6cbe567e
Author: Wellington Ramos Chevreuil <wc...@apache.org>
AuthorDate: Thu Jun 15 21:38:39 2023 +0100

    HBASE-27871 Meta replication stuck forever if wal it's still reading gets rolled and deleted (#5271)
    
    Signed-off-by: Peter Somogyi <ps...@apache.org>
---
 .../regionserver/ReplicationSourceWALReader.java   |  6 +++
 .../replication/regionserver/WALEntryStream.java   |  6 ++-
 .../TestMetaRegionReplicaReplicationEndpoint.java  | 46 ++++++++++++++++++++++
 3 files changed, 57 insertions(+), 1 deletion(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
index 256f9a78624..e6718c06829 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
@@ -24,6 +24,7 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -79,6 +80,8 @@ class ReplicationSourceWALReader extends Thread {
   private long totalBufferQuota;
   private final String walGroupId;
 
+  AtomicBoolean waitingPeerEnabled = new AtomicBoolean(false);
+
   /**
    * Creates a reader worker for a given WAL queue. Reads WAL entries off a given queue, batches the
    * entries, and puts them on a batch queue.
@@ -130,8 +133,11 @@ class ReplicationSourceWALReader extends Thread {
         while (isReaderRunning()) { // loop here to keep reusing stream while we can
           batch = null;
           if (!source.isPeerEnabled()) {
+            waitingPeerEnabled.set(true);
             Threads.sleep(sleepForRetries);
             continue;
+          } else {
+            waitingPeerEnabled.set(false);
           }
           if (!checkQuota()) {
             continue;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
index aa059aa30a2..cbaf3c6f6e9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.replication.regionserver;
 import java.io.Closeable;
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.io.UncheckedIOException;
 import java.util.OptionalLong;
 import java.util.concurrent.PriorityBlockingQueue;
 import org.apache.hadoop.conf.Configuration;
@@ -386,7 +387,10 @@ class WALEntryStream implements Closeable {
       if (archivedLog != null) {
         openReader(archivedLog);
       } else {
-        throw fnfe;
+        // For now, this could happen only when reading meta wal for meta replicas.
+        // In this case, raising UncheckedIOException will let the endpoint deal with resetting
+        // the replication source. See HBASE-27871.
+        throw new UncheckedIOException(fnfe);
       }
     } catch (NullPointerException npe) {
       throw new IOException("NPE resetting reader, likely HDFS-4380", npe);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetaRegionReplicaReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetaRegionReplicaReplicationEndpoint.java
index 7ca651dbe77..7196c2827ca 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetaRegionReplicaReplicationEndpoint.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetaRegionReplicaReplicationEndpoint.java
@@ -29,6 +29,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Objects;
+import org.apache.commons.lang3.mutable.MutableObject;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellScanner;
@@ -53,6 +54,7 @@ import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.replication.ReplicationPeerImpl;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
@@ -225,6 +227,50 @@ public class TestMetaRegionReplicaReplicationEndpoint {
     }
   }
 
+  @Test
+  public void testCatalogReplicaReplicationWALRolledAndDeleted() throws Exception {
+    TableName tableName = TableName.valueOf("hbase:meta");
+    try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
+      Table table = connection.getTable(tableName)) {
+      MiniHBaseCluster cluster = HTU.getHBaseCluster();
+      cluster.getMaster().balanceSwitch(false);
+      HRegionServer hrs = cluster.getRegionServer(cluster.getServerHoldingMeta());
+      ReplicationSource source = (ReplicationSource) hrs.getReplicationSourceService()
+        .getReplicationManager().catalogReplicationSource.get();
+      ((ReplicationPeerImpl) source.replicationPeer).setPeerState(false);
+      // there's small chance source reader has passed the peer state check but not yet read the
+      // wal, which could allow it to read some added entries before the wal gets deleted,
+      // so we are making sure here we only proceed once the reader loop has managed to
+      // detect the peer is disabled.
+      HTU.waitFor(2000, 100, true, () -> {
+        MutableObject<Boolean> readerWaiting = new MutableObject<>(true);
+        source.logQueue.getQueues().keySet()
+          .forEach(w -> readerWaiting.setValue(readerWaiting.getValue()
+            && source.workerThreads.get(w).entryReader.waitingPeerEnabled.get()));
+        return readerWaiting.getValue();
+      });
+      // load the data to the table
+      for (int i = 0; i < 5; i++) {
+        LOG.info("Writing data from " + i * 1000 + " to " + (i * 1000 + 1000));
+        HTU.loadNumericRows(table, HConstants.CATALOG_FAMILY, i * 1000, i * 1000 + 1000);
+        LOG.info("flushing table");
+        HTU.flush(tableName);
+        LOG.info("compacting table");
+        if (i < 4) {
+          HTU.compact(tableName, false);
+        }
+      }
+      HTU.getHBaseCluster().getMaster().getLogCleaner().runCleaner();
+      ((ReplicationPeerImpl) source.replicationPeer).setPeerState(true);
+      // now loads more data without flushing nor compacting
+      for (int i = 5; i < 10; i++) {
+        LOG.info("Writing data from " + i * 1000 + " to " + (i * 1000 + 1000));
+        HTU.loadNumericRows(table, HConstants.CATALOG_FAMILY, i * 1000, i * 1000 + 1000);
+      }
+      verifyReplication(tableName, numOfMetaReplica, 0, 10000, HConstants.CATALOG_FAMILY);
+    }
+  }
+
   @Test
   public void testCatalogReplicaReplicationWithReplicaMoved() throws Exception {
     MiniHBaseCluster cluster = HTU.getMiniHBaseCluster();