You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zg...@apache.org on 2020/08/24 14:41:38 UTC
[hbase] branch branch-2 updated: HBASE-24871 Replication may loss
data when refresh recovered replication sources (#2249)
This is an automated email from the ASF dual-hosted git repository.
zghao pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2 by this push:
new 0e63b12 HBASE-24871 Replication may loss data when refresh recovered replication sources (#2249)
0e63b12 is described below
commit 0e63b12648abfffca4549f4d23ff68431c28ec0b
Author: XinSun <dd...@gmail.com>
AuthorDate: Mon Aug 24 21:43:15 2020 +0800
HBASE-24871 Replication may loss data when refresh recovered replication sources (#2249)
Signed-off-by: huaxiangsun <hu...@apache.org>
Signed-off-by: Guanghao Zhang <zg...@apache.org>
Conflicts:
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
---
.../regionserver/ReplicationSourceManager.java | 13 +-
.../hbase/replication/TestReplicationBase.java | 2 +-
.../TestRefreshRecoveredReplication.java | 161 +++++++++++++++++++++
3 files changed, 166 insertions(+), 10 deletions(-)
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index d69d8ea..4c4b0de 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -417,17 +417,12 @@ public class ReplicationSourceManager implements ReplicationListener {
}
}
for (String queueId : previousQueueIds) {
- ReplicationSourceInterface replicationSource = createSource(queueId, peer);
- this.oldsources.add(replicationSource);
- LOG.trace("Added source for recovered queue: " + src.getQueueId());
+ ReplicationSourceInterface recoveredReplicationSource = createSource(queueId, peer);
+ this.oldsources.add(recoveredReplicationSource);
for (SortedSet<String> walsByGroup : walsByIdRecoveredQueues.get(queueId).values()) {
- walsByGroup.forEach(wal -> {
- LOG.trace("Enqueueing log from recovered queue for source: {}",
- src.getQueueId());
- src.enqueueLog(new Path(wal));
- });
+ walsByGroup.forEach(wal -> recoveredReplicationSource.enqueueLog(new Path(wal)));
}
- toStartup.add(replicationSource);
+ toStartup.add(recoveredReplicationSource);
}
}
for (ReplicationSourceInterface replicationSource : toStartup) {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
index beedd5e..e76c222 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
@@ -77,7 +77,7 @@ public class TestReplicationBase {
protected static Configuration CONF1 = UTIL1.getConfiguration();
protected static Configuration CONF2 = UTIL2.getConfiguration();
- protected static final int NUM_SLAVES1 = 1;
+ protected static int NUM_SLAVES1 = 1;
protected static final int NUM_SLAVES2 = 1;
protected static final int NB_ROWS_IN_BATCH = 100;
protected static final int NB_ROWS_IN_BIG_BATCH =
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRefreshRecoveredReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRefreshRecoveredReplication.java
new file mode 100644
index 0000000..f84f32a
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRefreshRecoveredReplication.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import java.io.IOException;
+import java.util.Optional;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.replication.TestReplicationBase;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
+
+/**
+ * Testcase for HBASE-24871.
+ */
+@Category({ ReplicationTests.class, MediumTests.class })
+public class TestRefreshRecoveredReplication extends TestReplicationBase {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestRefreshRecoveredReplication.class);
+
+ private static final Logger LOG = LoggerFactory.getLogger(TestRefreshRecoveredReplication.class);
+
+ private static final int BATCH = 50;
+
+ @Rule
+ public TestName name = new TestName();
+
+ private TableName tablename;
+ private Table table1;
+ private Table table2;
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ NUM_SLAVES1 = 2;
+ // replicate slowly
+ Configuration conf1 = UTIL1.getConfiguration();
+ conf1.setInt(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY, 100);
+ TestReplicationBase.setUpBeforeClass();
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ TestReplicationBase.tearDownAfterClass();
+ }
+
+ @Before
+ public void setup() throws Exception {
+ setUpBase();
+
+ tablename = TableName.valueOf(name.getMethodName());
+ TableDescriptor table = TableDescriptorBuilder.newBuilder(tablename)
+ .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName)
+ .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
+ .build();
+
+ UTIL1.getAdmin().createTable(table);
+ UTIL2.getAdmin().createTable(table);
+ UTIL1.waitTableAvailable(tablename);
+ UTIL2.waitTableAvailable(tablename);
+ table1 = UTIL1.getConnection().getTable(tablename);
+ table2 = UTIL2.getConnection().getTable(tablename);
+ }
+
+ @After
+ public void teardown() throws Exception {
+ tearDownBase();
+
+ UTIL1.deleteTableIfAny(tablename);
+ UTIL2.deleteTableIfAny(tablename);
+ }
+
+ @Test
+ public void testReplicationRefreshSource() throws Exception {
+ // put some data
+ for (int i = 0; i < BATCH; i++) {
+ byte[] r = Bytes.toBytes(i);
+ table1.put(new Put(r).addColumn(famName, famName, r));
+ }
+
+ // kill rs holding table region
+ Optional<RegionServerThread> server = UTIL1.getMiniHBaseCluster().getLiveRegionServerThreads()
+ .stream()
+ .filter(rst -> CollectionUtils.isNotEmpty(rst.getRegionServer().getRegions(tablename)))
+ .findAny();
+ Assert.assertTrue(server.isPresent());
+ server.get().getRegionServer().abort("stopping for test");
+ UTIL1.waitFor(60000, () ->
+ UTIL1.getMiniHBaseCluster().getLiveRegionServerThreads().size() == NUM_SLAVES1 - 1);
+ UTIL1.waitTableAvailable(tablename);
+
+ // waiting for recovered peer to start
+ Replication replication = (Replication) UTIL1.getMiniHBaseCluster()
+ .getLiveRegionServerThreads().get(0).getRegionServer().getReplicationSourceService();
+ UTIL1.waitFor(60000, () ->
+ !replication.getReplicationManager().getOldSources().isEmpty());
+
+ // disable peer to trigger refreshSources
+ hbaseAdmin.disableReplicationPeer(PEER_ID2);
+ LOG.info("has replicated {} rows before disable peer", checkReplicationData());
+ hbaseAdmin.enableReplicationPeer(PEER_ID2);
+ // waiting to replicate all data to slave
+ UTIL2.waitFor(60000, () -> {
+ int count = checkReplicationData();
+ LOG.info("Waiting all logs pushed to slave. Expected {} , actual {}", BATCH, count);
+ return count == BATCH;
+ });
+ }
+
+ private int checkReplicationData() throws IOException {
+ int count = 0;
+ ResultScanner results = table2.getScanner(new Scan().setCaching(BATCH));
+ for (Result r : results) {
+ count++;
+ }
+ return count;
+ }
+}