You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zg...@apache.org on 2020/08/24 14:57:16 UTC

[hbase] branch branch-2.2 updated: HBASE-24871 Replication may loss data when refresh recovered replication sources (#2249)

This is an automated email from the ASF dual-hosted git repository.

zghao pushed a commit to branch branch-2.2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2.2 by this push:
     new e300ae3  HBASE-24871 Replication may loss data when refresh recovered replication sources (#2249)
e300ae3 is described below

commit e300ae356a84afdf806078220b4929f48ddd93c7
Author: XinSun <dd...@gmail.com>
AuthorDate: Mon Aug 24 21:43:15 2020 +0800

    HBASE-24871 Replication may loss data when refresh recovered replication sources (#2249)
    
    Signed-off-by: huaxiangsun <hu...@apache.org>
    Signed-off-by: Guanghao Zhang <zg...@apache.org>
    Conflicts:
    	hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
    
    Conflicts:
    	hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
---
 .../regionserver/ReplicationSourceManager.java     |   8 +-
 .../hbase/replication/TestReplicationBase.java     |   2 +-
 .../TestRefreshRecoveredReplication.java           | 161 +++++++++++++++++++++
 3 files changed, 166 insertions(+), 5 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index 41c47af..a6fc313 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -404,12 +404,12 @@ public class ReplicationSourceManager implements ReplicationListener {
         }
       }
       for (String queueId : previousQueueIds) {
-        ReplicationSourceInterface replicationSource = createSource(queueId, peer);
-        this.oldsources.add(replicationSource);
+        ReplicationSourceInterface recoveredReplicationSource = createSource(queueId, peer);
+        this.oldsources.add(recoveredReplicationSource);
         for (SortedSet<String> walsByGroup : walsByIdRecoveredQueues.get(queueId).values()) {
-          walsByGroup.forEach(wal -> src.enqueueLog(new Path(wal)));
+          walsByGroup.forEach(wal -> recoveredReplicationSource.enqueueLog(new Path(wal)));
         }
-        toStartup.add(replicationSource);
+        toStartup.add(recoveredReplicationSource);
       }
     }
     for (ReplicationSourceInterface replicationSource : toStartup) {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
index ff516da..3d58b0a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
@@ -76,7 +76,7 @@ public class TestReplicationBase {
   protected static final Configuration CONF1 = UTIL1.getConfiguration();
   protected static final Configuration CONF2 = UTIL2.getConfiguration();
 
-  protected static final int NUM_SLAVES1 = 1;
+  protected static int NUM_SLAVES1 = 1;
   protected static final int NUM_SLAVES2 = 1;
   protected static final int NB_ROWS_IN_BATCH = 100;
   protected static final int NB_ROWS_IN_BIG_BATCH =
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRefreshRecoveredReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRefreshRecoveredReplication.java
new file mode 100644
index 0000000..f84f32a
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRefreshRecoveredReplication.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import java.io.IOException;
+import java.util.Optional;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.replication.TestReplicationBase;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
+
+/**
+ * Testcase for HBASE-24871.
+ */
+@Category({ ReplicationTests.class, MediumTests.class })
+public class TestRefreshRecoveredReplication extends TestReplicationBase {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+      HBaseClassTestRule.forClass(TestRefreshRecoveredReplication.class);
+
+  private static final Logger LOG = LoggerFactory.getLogger(TestRefreshRecoveredReplication.class);
+
+  private static final int BATCH = 50;
+
+  @Rule
+  public TestName name = new TestName();
+
+  private TableName tablename;
+  private Table table1;
+  private Table table2;
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    NUM_SLAVES1 = 2;
+    // replicate slowly
+    Configuration conf1 = UTIL1.getConfiguration();
+    conf1.setInt(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY, 100);
+    TestReplicationBase.setUpBeforeClass();
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    TestReplicationBase.tearDownAfterClass();
+  }
+
+  @Before
+  public void setup() throws Exception {
+    setUpBase();
+
+    tablename = TableName.valueOf(name.getMethodName());
+    TableDescriptor table = TableDescriptorBuilder.newBuilder(tablename)
+        .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName)
+            .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
+        .build();
+
+    UTIL1.getAdmin().createTable(table);
+    UTIL2.getAdmin().createTable(table);
+    UTIL1.waitTableAvailable(tablename);
+    UTIL2.waitTableAvailable(tablename);
+    table1 = UTIL1.getConnection().getTable(tablename);
+    table2 = UTIL2.getConnection().getTable(tablename);
+  }
+
+  @After
+  public void teardown() throws Exception {
+    tearDownBase();
+
+    UTIL1.deleteTableIfAny(tablename);
+    UTIL2.deleteTableIfAny(tablename);
+  }
+
+  @Test
+  public void testReplicationRefreshSource() throws Exception {
+    // put some data
+    for (int i = 0; i < BATCH; i++) {
+      byte[] r = Bytes.toBytes(i);
+      table1.put(new Put(r).addColumn(famName, famName, r));
+    }
+
+    // kill rs holding table region
+    Optional<RegionServerThread> server = UTIL1.getMiniHBaseCluster().getLiveRegionServerThreads()
+        .stream()
+        .filter(rst -> CollectionUtils.isNotEmpty(rst.getRegionServer().getRegions(tablename)))
+        .findAny();
+    Assert.assertTrue(server.isPresent());
+    server.get().getRegionServer().abort("stopping for test");
+    UTIL1.waitFor(60000, () ->
+        UTIL1.getMiniHBaseCluster().getLiveRegionServerThreads().size() == NUM_SLAVES1 - 1);
+    UTIL1.waitTableAvailable(tablename);
+
+    // waiting for recovered peer to start
+    Replication replication = (Replication) UTIL1.getMiniHBaseCluster()
+        .getLiveRegionServerThreads().get(0).getRegionServer().getReplicationSourceService();
+    UTIL1.waitFor(60000, () ->
+        !replication.getReplicationManager().getOldSources().isEmpty());
+
+    // disable peer to trigger refreshSources
+    hbaseAdmin.disableReplicationPeer(PEER_ID2);
+    LOG.info("has replicated {} rows before disable peer", checkReplicationData());
+    hbaseAdmin.enableReplicationPeer(PEER_ID2);
+    // waiting to replicate all data to slave
+    UTIL2.waitFor(60000, () -> {
+      int count = checkReplicationData();
+      LOG.info("Waiting all logs pushed to slave. Expected {} , actual {}", BATCH, count);
+      return count == BATCH;
+    });
+  }
+
+  private int checkReplicationData() throws IOException {
+    int count = 0;
+    ResultScanner results = table2.getScanner(new Scan().setCaching(BATCH));
+    for (Result r : results) {
+      count++;
+    }
+    return count;
+  }
+}