You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2018/05/24 03:27:38 UTC
[10/37] hbase git commit: HBASE-20624 Race in ReplicationSource which
causes walEntryFilter being null when creating new shipper
HBASE-20624 Race in ReplicationSource which causes walEntryFilter being null when creating new shipper
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/ee540c9f
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/ee540c9f
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/ee540c9f
Branch: refs/heads/HBASE-19064
Commit: ee540c9f9ec83def56ad15138b636271c761ed3a
Parents: c253f8f
Author: zhangduo <zh...@apache.org>
Authored: Wed May 23 21:24:49 2018 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Thu May 24 10:48:29 2018 +0800
----------------------------------------------------------------------
.../regionserver/ReplicationSource.java | 19 +-
.../TestRaceWhenCreatingReplicationSource.java | 208 +++++++++++++++++++
2 files changed, 218 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/ee540c9f/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index b05a673..4051efe 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -101,8 +101,6 @@ public class ReplicationSource implements ReplicationSourceInterface {
protected FileSystem fs;
// id of this cluster
private UUID clusterId;
- // id of the other cluster
- private UUID peerClusterId;
// total number of edits we replicated
private AtomicLong totalReplicatedEdits = new AtomicLong(0);
// The znode we currently play with
@@ -118,7 +116,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
// ReplicationEndpoint which will handle the actual replication
private volatile ReplicationEndpoint replicationEndpoint;
// A filter (or a chain of filters) for the WAL entries.
- protected WALEntryFilter walEntryFilter;
+ protected volatile WALEntryFilter walEntryFilter;
// throttler
private ReplicationThrottler throttler;
private long defaultBandwidth;
@@ -197,7 +195,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
if (queue == null) {
queue = new PriorityBlockingQueue<>(queueSizePerGroup, new LogsComparator());
queues.put(logPrefix, queue);
- if (this.isSourceActive() && this.replicationEndpoint != null) {
+ if (this.isSourceActive() && this.walEntryFilter != null) {
// new wal group observed after source startup, start a new worker thread to track it
// notice: it's possible that log enqueued when this.running is set but worker thread
// still not launched, so it's necessary to check workerThreads before start the worker
@@ -282,7 +280,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
replicationEndpoint.awaitRunning(waitOnEndpointSeconds, TimeUnit.SECONDS);
}
- private void initializeWALEntryFilter() {
+ private void initializeWALEntryFilter(UUID peerClusterId) {
// get the WALEntryFilter from ReplicationEndpoint and add it to default filters
ArrayList<WALEntryFilter> filters =
Lists.<WALEntryFilter> newArrayList(new SystemTableWALEntryFilter());
@@ -430,13 +428,16 @@ public class ReplicationSource implements ReplicationSourceInterface {
}
sleepMultiplier = 1;
+ UUID peerClusterId;
// delay this until we are in an asynchronous thread
- while (this.isSourceActive() && this.peerClusterId == null) {
- this.peerClusterId = replicationEndpoint.getPeerUUID();
- if (this.isSourceActive() && this.peerClusterId == null) {
+ for (;;) {
+ peerClusterId = replicationEndpoint.getPeerUUID();
+ if (this.isSourceActive() && peerClusterId == null) {
if (sleepForRetries("Cannot contact the peer's zk ensemble", sleepMultiplier)) {
sleepMultiplier++;
}
+ } else {
+ break;
}
}
@@ -451,7 +452,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
}
LOG.info("Replicating " + clusterId + " -> " + peerClusterId);
- initializeWALEntryFilter();
+ initializeWALEntryFilter(peerClusterId);
// start workers
for (Map.Entry<String, PriorityBlockingQueue<Path>> entry : queues.entrySet()) {
String walGroupId = entry.getKey();
http://git-wip-us.apache.org/repos/asf/hbase/blob/ee540c9f/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRaceWhenCreatingReplicationSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRaceWhenCreatingReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRaceWhenCreatingReplicationSource.java
new file mode 100644
index 0000000..3ef9215
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRaceWhenCreatingReplicationSource.java
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.UUID;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hbase.wal.WALProvider;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Testcase for HBASE-20624.
+ */
+@Category({ ReplicationTests.class, MediumTests.class })
+public class TestRaceWhenCreatingReplicationSource {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestRaceWhenCreatingReplicationSource.class);
+
+ private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
+ private static String PEER_ID = "1";
+
+ private static TableName TABLE_NAME = TableName.valueOf("race");
+
+ private static byte[] CF = Bytes.toBytes("CF");
+
+ private static byte[] CQ = Bytes.toBytes("CQ");
+
+ private static FileSystem FS;
+
+ private static Path LOG_PATH;
+
+ private static WALProvider.Writer WRITER;
+
+ private static volatile boolean NULL_UUID = true;
+
+ public static final class LocalReplicationEndpoint extends BaseReplicationEndpoint {
+
+ private static final UUID PEER_UUID = UUID.randomUUID();
+
+ @Override
+ public UUID getPeerUUID() {
+ if (NULL_UUID) {
+ return null;
+ } else {
+ return PEER_UUID;
+ }
+ }
+
+ @Override
+ public boolean replicate(ReplicateContext replicateContext) {
+ synchronized (WRITER) {
+ try {
+ for (Entry entry : replicateContext.getEntries()) {
+ WRITER.append(entry);
+ }
+ WRITER.sync(false);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public void start() {
+ startAsync();
+ }
+
+ @Override
+ public void stop() {
+ stopAsync();
+ }
+
+ @Override
+ protected void doStart() {
+ notifyStarted();
+ }
+
+ @Override
+ protected void doStop() {
+ notifyStopped();
+ }
+ }
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ UTIL.getConfiguration().set(WALFactory.WAL_PROVIDER, "multiwal");
+ // make sure that we will create a new group for the table
+ UTIL.getConfiguration().setInt("hbase.wal.regiongrouping.numgroups", 8);
+ UTIL.startMiniCluster(3);
+ Path dir = UTIL.getDataTestDirOnTestFS();
+ FS = UTIL.getTestFileSystem();
+ LOG_PATH = new Path(dir, "replicated");
+ WRITER = WALFactory.createWALWriter(FS, LOG_PATH, UTIL.getConfiguration());
+ UTIL.getAdmin().addReplicationPeer(PEER_ID,
+ ReplicationPeerConfig.newBuilder().setClusterKey("127.0.0.1:2181:/hbase")
+ .setReplicationEndpointImpl(LocalReplicationEndpoint.class.getName()).build(),
+ true);
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ UTIL.shutdownMiniCluster();
+ }
+
+ @Test
+ public void testRace() throws Exception {
+ UTIL.waitFor(30000, new ExplainingPredicate<Exception>() {
+
+ @Override
+ public boolean evaluate() throws Exception {
+ for (RegionServerThread t : UTIL.getMiniHBaseCluster().getRegionServerThreads()) {
+ ReplicationSource source =
+ (ReplicationSource) ((Replication) t.getRegionServer().getReplicationSourceService())
+ .getReplicationManager().getSource(PEER_ID);
+ if (source == null || source.getReplicationEndpoint() == null) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public String explainFailure() throws Exception {
+ return "Replication source has not been initialized yet";
+ }
+ });
+ UTIL.getAdmin().createTable(
+ TableDescriptorBuilder.newBuilder(TABLE_NAME).setColumnFamily(ColumnFamilyDescriptorBuilder
+ .newBuilder(CF).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()).build());
+ UTIL.waitTableAvailable(TABLE_NAME);
+ try (Table table = UTIL.getConnection().getTable(TABLE_NAME)) {
+ table.put(new Put(Bytes.toBytes(1)).addColumn(CF, CQ, Bytes.toBytes(1)));
+ }
+ NULL_UUID = false;
+ UTIL.waitFor(30000, new ExplainingPredicate<Exception>() {
+
+ @Override
+ public boolean evaluate() throws Exception {
+ try (WAL.Reader reader = WALFactory.createReader(FS, LOG_PATH, UTIL.getConfiguration())) {
+ return reader.next() != null;
+ } catch (IOException e) {
+ return false;
+ }
+ }
+
+ @Override
+ public String explainFailure() throws Exception {
+ return "Replication has not catched up";
+ }
+ });
+ try (WAL.Reader reader = WALFactory.createReader(FS, LOG_PATH, UTIL.getConfiguration())) {
+ Cell cell = reader.next().getEdit().getCells().get(0);
+ assertEquals(1, Bytes.toInt(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()));
+ assertArrayEquals(CF, CellUtil.cloneFamily(cell));
+ assertArrayEquals(CQ, CellUtil.cloneQualifier(cell));
+ assertEquals(1,
+ Bytes.toInt(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));
+ }
+ }
+}