You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2021/12/05 14:26:55 UTC

[hbase] branch branch-1 updated: HBASE-26482 HMaster may clean wals that is replicating in rare cases (#3887)

This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch branch-1
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-1 by this push:
     new 8d96fc3  HBASE-26482 HMaster may clean wals that is replicating in rare cases (#3887)
8d96fc3 is described below

commit 8d96fc36143d3b217493f87f93a794f50ae4c744
Author: zhengzhuobinzzb <zh...@gmail.com>
AuthorDate: Sun Dec 5 22:26:20 2021 +0800

    HBASE-26482 HMaster may clean wals that is replicating in rare cases (#3887)
    
    Signed-off-by: Duo Zhang <zh...@apache.org>
---
 .../hbase/replication/ReplicationQueuesClient.java |  8 +++++
 .../replication/ReplicationQueuesClientZKImpl.java | 14 +++++++++
 .../replication/master/ReplicationLogCleaner.java  |  6 +++-
 .../hbase/master/cleaner/TestLogsCleaner.java      | 34 ++++++++++++++++++++++
 .../replication/TestReplicationStateZKImpl.java    |  7 +++++
 5 files changed, 68 insertions(+), 1 deletion(-)

diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java
index 7fa3bbb..6b4c869 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hbase.replication;
 
 import java.util.List;
+import java.util.Map;
 
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.zookeeper.KeeperException;
@@ -68,6 +69,13 @@ public interface ReplicationQueuesClient {
   int getQueuesZNodeCversion() throws KeeperException;
 
   /**
+   * Get a map of cversion of all replicator nodes. This can be used as optimistic locking
+   * to get a consistent snapshot of the replication queues.
+   * @return a map of replicator to cversion
+   */
+  Map<String, Integer> getReplicatorsZNodeCversion() throws KeeperException;
+
+  /**
    * Get the change version number of replication hfile references node. This can be used as
    * optimistic locking to get a consistent snapshot of the replication queues of hfile references.
    * @return change version number of hfile references node
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java
index 14b4334..1f7538c 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java
@@ -18,7 +18,9 @@
  */
 package org.apache.hadoop.hbase.replication;
 
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
@@ -92,6 +94,18 @@ public class ReplicationQueuesClientZKImpl extends ReplicationStateZKBase implem
     }
   }
 
+  @Override public Map<String, Integer> getReplicatorsZNodeCversion()
+    throws KeeperException {
+    List<String> rss = super.getListOfReplicatorsZK();
+    Map<String, Integer> rsToCversion = new HashMap<>();
+    for (String rs : rss) {
+      Stat stat = new Stat();
+      ZKUtil.getDataNoWatch(this.zookeeper, ZKUtil.joinZNode(this.queuesZNode, rs), stat);
+      rsToCversion.put(rs, stat.getCversion());
+    }
+    return rsToCversion;
+  }
+
   @Override
   public int getHFileRefsNodeChangeVersion() throws KeeperException {
     Stat stat = new Stat();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
index 7ac8489..cfb1afb 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
@@ -106,6 +106,9 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate {
         LOG.debug("Didn't find any region server that replicates, won't prevent any deletions.");
         return ImmutableSet.of();
       }
+      // We should also check cversions of all rs nodes to Prevent missing of WAL which are claiming
+      // by other regionServer. For details, please see HBASE-26482
+      Map<String, Integer> rsToCversionBefore = replicationQueues.getReplicatorsZNodeCversion();
       Set<String> wals = Sets.newHashSet();
       for (String rs : rss) {
         List<String> listOfPeers = replicationQueues.getAllQueues(rs);
@@ -121,7 +124,8 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate {
         }
       }
       int v1 = replicationQueues.getQueuesZNodeCversion();
-      if (v0 == v1) {
+      Map<String, Integer> rsToCversionAfter = replicationQueues.getReplicatorsZNodeCversion();
+      if (v0 == v1 && rsToCversionBefore.equals(rsToCversionAfter)) {
         return wals;
       }
       LOG.info(String.format("Replication queue node cversion changed from %d to %d, retry = %d",
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java
index 39cbc96..d4c305d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java
@@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.atLeast;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.doThrow;
@@ -41,6 +42,7 @@ import java.util.Map;
 import java.util.Random;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -188,6 +190,9 @@ public class TestLogsCleaner {
 
     ReplicationQueuesClient rqcMock = mock(ReplicationQueuesClient.class);
     Mockito.when(rqcMock.getQueuesZNodeCversion()).thenReturn(1, 2, 3, 4);
+    // Avoid direct return because there no replicator.
+    Mockito.when(rqcMock.getListOfReplicators())
+      .thenReturn(Lists.newArrayList("s1", "s2"));
 
     Field rqc = ReplicationLogCleaner.class.getDeclaredField("replicationQueues");
     rqc.setAccessible(true);
@@ -196,6 +201,35 @@ public class TestLogsCleaner {
 
     // This should return eventually when cversion stabilizes
     cleaner.getDeletableFiles(new LinkedList<FileStatus>());
+    // Test did get an optimistic lock
+    Mockito.verify(rqcMock, atLeast(5)).getQueuesZNodeCversion();
+  }
+
+  @Test
+  public void testReplicatorZnodeCversionChange()
+    throws KeeperException, NoSuchFieldException, IllegalAccessException {
+    Configuration conf = TEST_UTIL.getConfiguration();
+    ReplicationLogCleaner cleaner = new ReplicationLogCleaner();
+    cleaner.setConf(conf);
+
+    ReplicationQueuesClient rqcMock = mock(ReplicationQueuesClient.class);
+    // Avoid direct return because there no replicator.
+    Mockito.when(rqcMock.getListOfReplicators()).thenReturn(Lists.newArrayList("s1", "s2"));
+    Mockito.when(rqcMock.getReplicatorsZNodeCversion()).thenReturn(
+      ImmutableMap.of("s1", 0, "s2", 0),
+      ImmutableMap.of("s1", 1, "s2", 1),
+      ImmutableMap.of("s1", 2, "s2", 2),
+      ImmutableMap.of("s1", 3, "s2", 3));
+
+    Field rqc = ReplicationLogCleaner.class.getDeclaredField("replicationQueues");
+    rqc.setAccessible(true);
+
+    rqc.set(cleaner, rqcMock);
+
+    // This should return eventually when cversion stabilizes
+    cleaner.getDeletableFiles(new LinkedList<FileStatus>());
+    // Test did get an optimistic lock
+    Mockito.verify(rqcMock, atLeast(5)).getReplicatorsZNodeCversion();
   }
 
   @Test(timeout=10000)
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java
index f8877dc..07aaceb 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java
@@ -127,6 +127,13 @@ public class TestReplicationStateZKImpl extends TestReplicationStateBasic {
     assertTrue(rqZK.isPeerPath(peerPath));
   }
 
+  @Test
+  public void testZNodeCversion() throws ReplicationException, KeeperException {
+    rq1.init(server1);
+
+    assertTrue(rqc.getReplicatorsZNodeCversion().containsKey(server1));
+  }
+
   static class DummyServer implements Server {
     private String serverName;
     private boolean isAborted = false;