You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2018/05/24 03:27:38 UTC

[10/37] hbase git commit: HBASE-20624 Race in ReplicationSource which causes walEntryFilter being null when creating new shipper

HBASE-20624 Race in ReplicationSource which causes walEntryFilter being null when creating new shipper


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/ee540c9f
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/ee540c9f
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/ee540c9f

Branch: refs/heads/HBASE-19064
Commit: ee540c9f9ec83def56ad15138b636271c761ed3a
Parents: c253f8f
Author: zhangduo <zh...@apache.org>
Authored: Wed May 23 21:24:49 2018 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Thu May 24 10:48:29 2018 +0800

----------------------------------------------------------------------
 .../regionserver/ReplicationSource.java         |  19 +-
 .../TestRaceWhenCreatingReplicationSource.java  | 208 +++++++++++++++++++
 2 files changed, 218 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/ee540c9f/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index b05a673..4051efe 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -101,8 +101,6 @@ public class ReplicationSource implements ReplicationSourceInterface {
   protected FileSystem fs;
   // id of this cluster
   private UUID clusterId;
-  // id of the other cluster
-  private UUID peerClusterId;
   // total number of edits we replicated
   private AtomicLong totalReplicatedEdits = new AtomicLong(0);
   // The znode we currently play with
@@ -118,7 +116,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
   // ReplicationEndpoint which will handle the actual replication
   private volatile ReplicationEndpoint replicationEndpoint;
   // A filter (or a chain of filters) for the WAL entries.
-  protected WALEntryFilter walEntryFilter;
+  protected volatile WALEntryFilter walEntryFilter;
   // throttler
   private ReplicationThrottler throttler;
   private long defaultBandwidth;
@@ -197,7 +195,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
     if (queue == null) {
       queue = new PriorityBlockingQueue<>(queueSizePerGroup, new LogsComparator());
       queues.put(logPrefix, queue);
-      if (this.isSourceActive() && this.replicationEndpoint != null) {
+      if (this.isSourceActive() && this.walEntryFilter != null) {
         // new wal group observed after source startup, start a new worker thread to track it
         // notice: it's possible that log enqueued when this.running is set but worker thread
         // still not launched, so it's necessary to check workerThreads before start the worker
@@ -282,7 +280,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
     replicationEndpoint.awaitRunning(waitOnEndpointSeconds, TimeUnit.SECONDS);
   }
 
-  private void initializeWALEntryFilter() {
+  private void initializeWALEntryFilter(UUID peerClusterId) {
     // get the WALEntryFilter from ReplicationEndpoint and add it to default filters
     ArrayList<WALEntryFilter> filters =
       Lists.<WALEntryFilter> newArrayList(new SystemTableWALEntryFilter());
@@ -430,13 +428,16 @@ public class ReplicationSource implements ReplicationSourceInterface {
     }
 
     sleepMultiplier = 1;
+    UUID peerClusterId;
     // delay this until we are in an asynchronous thread
-    while (this.isSourceActive() && this.peerClusterId == null) {
-      this.peerClusterId = replicationEndpoint.getPeerUUID();
-      if (this.isSourceActive() && this.peerClusterId == null) {
+    for (;;) {
+      peerClusterId = replicationEndpoint.getPeerUUID();
+      if (this.isSourceActive() && peerClusterId == null) {
         if (sleepForRetries("Cannot contact the peer's zk ensemble", sleepMultiplier)) {
           sleepMultiplier++;
         }
+      } else {
+        break;
       }
     }
 
@@ -451,7 +452,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
     }
     LOG.info("Replicating " + clusterId + " -> " + peerClusterId);
 
-    initializeWALEntryFilter();
+    initializeWALEntryFilter(peerClusterId);
     // start workers
     for (Map.Entry<String, PriorityBlockingQueue<Path>> entry : queues.entrySet()) {
       String walGroupId = entry.getKey();

http://git-wip-us.apache.org/repos/asf/hbase/blob/ee540c9f/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRaceWhenCreatingReplicationSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRaceWhenCreatingReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRaceWhenCreatingReplicationSource.java
new file mode 100644
index 0000000..3ef9215
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRaceWhenCreatingReplicationSource.java
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.UUID;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hbase.wal.WALProvider;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Testcase for HBASE-20624.
+ */
+@Category({ ReplicationTests.class, MediumTests.class })
+public class TestRaceWhenCreatingReplicationSource {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestRaceWhenCreatingReplicationSource.class);
+
+  private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
+  private static String PEER_ID = "1";
+
+  private static TableName TABLE_NAME = TableName.valueOf("race");
+
+  private static byte[] CF = Bytes.toBytes("CF");
+
+  private static byte[] CQ = Bytes.toBytes("CQ");
+
+  private static FileSystem FS;
+
+  private static Path LOG_PATH;
+
+  private static WALProvider.Writer WRITER;
+
+  private static volatile boolean NULL_UUID = true;
+
+  public static final class LocalReplicationEndpoint extends BaseReplicationEndpoint {
+
+    private static final UUID PEER_UUID = UUID.randomUUID();
+
+    @Override
+    public UUID getPeerUUID() {
+      if (NULL_UUID) {
+        return null;
+      } else {
+        return PEER_UUID;
+      }
+    }
+
+    @Override
+    public boolean replicate(ReplicateContext replicateContext) {
+      synchronized (WRITER) {
+        try {
+          for (Entry entry : replicateContext.getEntries()) {
+            WRITER.append(entry);
+          }
+          WRITER.sync(false);
+        } catch (IOException e) {
+          throw new UncheckedIOException(e);
+        }
+      }
+      return true;
+    }
+
+    @Override
+    public void start() {
+      startAsync();
+    }
+
+    @Override
+    public void stop() {
+      stopAsync();
+    }
+
+    @Override
+    protected void doStart() {
+      notifyStarted();
+    }
+
+    @Override
+    protected void doStop() {
+      notifyStopped();
+    }
+  }
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    UTIL.getConfiguration().set(WALFactory.WAL_PROVIDER, "multiwal");
+    // make sure that we will create a new group for the table
+    UTIL.getConfiguration().setInt("hbase.wal.regiongrouping.numgroups", 8);
+    UTIL.startMiniCluster(3);
+    Path dir = UTIL.getDataTestDirOnTestFS();
+    FS = UTIL.getTestFileSystem();
+    LOG_PATH = new Path(dir, "replicated");
+    WRITER = WALFactory.createWALWriter(FS, LOG_PATH, UTIL.getConfiguration());
+    UTIL.getAdmin().addReplicationPeer(PEER_ID,
+      ReplicationPeerConfig.newBuilder().setClusterKey("127.0.0.1:2181:/hbase")
+        .setReplicationEndpointImpl(LocalReplicationEndpoint.class.getName()).build(),
+      true);
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    UTIL.shutdownMiniCluster();
+  }
+
+  @Test
+  public void testRace() throws Exception {
+    UTIL.waitFor(30000, new ExplainingPredicate<Exception>() {
+
+      @Override
+      public boolean evaluate() throws Exception {
+        for (RegionServerThread t : UTIL.getMiniHBaseCluster().getRegionServerThreads()) {
+          ReplicationSource source =
+            (ReplicationSource) ((Replication) t.getRegionServer().getReplicationSourceService())
+              .getReplicationManager().getSource(PEER_ID);
+          if (source == null || source.getReplicationEndpoint() == null) {
+            return false;
+          }
+        }
+        return true;
+      }
+
+      @Override
+      public String explainFailure() throws Exception {
+        return "Replication source has not been initialized yet";
+      }
+    });
+    UTIL.getAdmin().createTable(
+      TableDescriptorBuilder.newBuilder(TABLE_NAME).setColumnFamily(ColumnFamilyDescriptorBuilder
+        .newBuilder(CF).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()).build());
+    UTIL.waitTableAvailable(TABLE_NAME);
+    try (Table table = UTIL.getConnection().getTable(TABLE_NAME)) {
+      table.put(new Put(Bytes.toBytes(1)).addColumn(CF, CQ, Bytes.toBytes(1)));
+    }
+    NULL_UUID = false;
+    UTIL.waitFor(30000, new ExplainingPredicate<Exception>() {
+
+      @Override
+      public boolean evaluate() throws Exception {
+        try (WAL.Reader reader = WALFactory.createReader(FS, LOG_PATH, UTIL.getConfiguration())) {
+          return reader.next() != null;
+        } catch (IOException e) {
+          return false;
+        }
+      }
+
+      @Override
+      public String explainFailure() throws Exception {
+        return "Replication has not catched up";
+      }
+    });
+    try (WAL.Reader reader = WALFactory.createReader(FS, LOG_PATH, UTIL.getConfiguration())) {
+      Cell cell = reader.next().getEdit().getCells().get(0);
+      assertEquals(1, Bytes.toInt(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()));
+      assertArrayEquals(CF, CellUtil.cloneFamily(cell));
+      assertArrayEquals(CQ, CellUtil.cloneQualifier(cell));
+      assertEquals(1,
+        Bytes.toInt(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));
+    }
+  }
+}