You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by wc...@apache.org on 2023/06/05 20:34:01 UTC

[hbase] branch branch-2 updated: HBASE-27871 Meta replication stuck forever if wal it's still reading gets rolled and deleted (#5241)

This is an automated email from the ASF dual-hosted git repository.

wchevreuil pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new d43dfcce902 HBASE-27871 Meta replication stuck forever if wal it's still reading gets rolled and deleted (#5241)
d43dfcce902 is described below

commit d43dfcce902da5c999f46f1122f01d03cdd8e5b3
Author: Wellington Ramos Chevreuil <wc...@apache.org>
AuthorDate: Mon Jun 5 21:33:48 2023 +0100

    HBASE-27871 Meta replication stuck forever if wal it's still reading gets rolled and deleted (#5241)
    
    Signed-off-by: Ankit Singhal <an...@apache.org>
    Signed-off-by: Duo Zhang <zh...@apache.org>
    Reviewed by: Andor Molnár <an...@cloudera.com>
    Reviewed by: Shanmukha Kota <sk...@cloudera.com>
---
 .../regionserver/ReplicationSourceWALReader.java   |  6 +++
 .../replication/regionserver/WALEntryStream.java   |  6 +++
 .../TestMetaRegionReplicaReplicationEndpoint.java  | 48 ++++++++++++++++++++++
 3 files changed, 60 insertions(+)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
index bd5b7736f3b..26360cbe3ea 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
@@ -23,6 +23,7 @@ import java.util.List;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -75,6 +76,8 @@ class ReplicationSourceWALReader extends Thread {
   private boolean isReaderRunning = true;
   private final String walGroupId;
 
+  AtomicBoolean waitingPeerEnabled = new AtomicBoolean(false);
+
   /**
    * Creates a reader worker for a given WAL queue. Reads WAL entries off a given queue, batches the
    * entries, and puts them on a batch queue.
@@ -137,8 +140,11 @@ class ReplicationSourceWALReader extends Thread {
         source.getWALFileLengthProvider(), source.getSourceMetrics(), walGroupId)) {
         while (isReaderRunning()) { // loop here to keep reusing stream while we can
           if (!source.isPeerEnabled()) {
+            waitingPeerEnabled.set(true);
             Threads.sleep(sleepForRetries);
             continue;
+          } else {
+            waitingPeerEnabled.set(false);
           }
           if (!checkBufferQuota()) {
             continue;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
index c6268674c5b..22bf05b3741 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.replication.regionserver;
 import java.io.Closeable;
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.io.UncheckedIOException;
 import java.util.OptionalLong;
 import java.util.concurrent.PriorityBlockingQueue;
 import org.apache.hadoop.conf.Configuration;
@@ -219,6 +220,11 @@ class WALEntryStream implements Closeable {
             // we will read from the beginning so we should always clear the compression context
             reader.resetTo(-1, true);
           }
+        } catch (FileNotFoundException e) {
+          // For now, this could happen only when reading meta wal for meta replicas.
+          // In this case, raising UncheckedIOException will let the endpoint deal with resetting
+          // the replication source. See HBASE-27871.
+          throw new UncheckedIOException(e);
         } catch (IOException e) {
           LOG.warn("Failed to reset reader {} to pos {}, reset compression={}", currentPath,
             currentPositionOfEntry, state.resetCompression(), e);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetaRegionReplicaReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetaRegionReplicaReplicationEndpoint.java
index 7ca651dbe77..d3e644ff433 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetaRegionReplicaReplicationEndpoint.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetaRegionReplicaReplicationEndpoint.java
@@ -29,6 +29,8 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.mutable.MutableObject;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellScanner;
@@ -53,6 +55,7 @@ import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.replication.ReplicationPeerImpl;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
@@ -225,6 +228,51 @@ public class TestMetaRegionReplicaReplicationEndpoint {
     }
   }
 
+  @Test
+  public void testCatalogReplicaReplicationWALRolledAndDeleted() throws Exception {
+    TableName tableName = TableName.valueOf("hbase:meta");
+    try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
+      Table table = connection.getTable(tableName)) {
+      MiniHBaseCluster cluster = HTU.getHBaseCluster();
+      cluster.getMaster().balanceSwitch(false);
+      HRegionServer hrs = cluster.getRegionServer(cluster.getServerHoldingMeta());
+      ReplicationSource source = (ReplicationSource) hrs.getReplicationSourceService()
+        .getReplicationManager().catalogReplicationSource.get();
+      ((ReplicationPeerImpl) source.replicationPeer).setPeerState(false);
+      // there's small chance source reader has passed the peer state check but not yet read the
+      // wal, which could allow it to read some added entries before the wal gets deleted,
+      // so we are making sure here we only proceed once the reader loop has managed to
+      // detect the peer is disabled.
+      HTU.waitFor(2000, 100, true, () -> {
+        MutableObject<Boolean> readerWaiting = new MutableObject<>(true);
+        source.logQueue.getQueues().keySet()
+          .forEach(w -> readerWaiting.setValue(readerWaiting.getValue()
+            && source.workerThreads.get(w).entryReader.waitingPeerEnabled.get()));
+        return readerWaiting.getValue();
+      });
+      // load the data to the table
+      for (int i = 0; i < 5; i++) {
+        LOG.info("Writing data from " + i * 1000 + " to " + (i * 1000 + 1000));
+        HTU.loadNumericRows(table, HConstants.CATALOG_FAMILY, i * 1000, i * 1000 + 1000);
+        LOG.info("flushing table");
+        HTU.flush(tableName);
+        LOG.info("compacting table");
+        if (i < 4) {
+          HTU.compact(tableName, false);
+        }
+      }
+      HTU.getHBaseCluster().getMaster().getLogCleaner().triggerCleanerNow().get(1,
+        TimeUnit.SECONDS);
+      ((ReplicationPeerImpl) source.replicationPeer).setPeerState(true);
+      // now loads more data without flushing nor compacting
+      for (int i = 5; i < 10; i++) {
+        LOG.info("Writing data from " + i * 1000 + " to " + (i * 1000 + 1000));
+        HTU.loadNumericRows(table, HConstants.CATALOG_FAMILY, i * 1000, i * 1000 + 1000);
+      }
+      verifyReplication(tableName, numOfMetaReplica, 0, 10000, HConstants.CATALOG_FAMILY);
+    }
+  }
+
   @Test
   public void testCatalogReplicaReplicationWithReplicaMoved() throws Exception {
     MiniHBaseCluster cluster = HTU.getMiniHBaseCluster();