You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zg...@apache.org on 2018/10/10 07:47:03 UTC

hbase git commit: HBASE-21277 Prevent to add same table to two sync replication peer's config

Repository: hbase
Updated Branches:
  refs/heads/master f12232875 -> a1f28f3ca


HBASE-21277 Prevent to add same table to two sync replication peer's config


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/a1f28f3c
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/a1f28f3c
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/a1f28f3c

Branch: refs/heads/master
Commit: a1f28f3ca77d1cd9777d694e246b57d642dfcdb8
Parents: f122328
Author: Guanghao Zhang <zg...@apache.org>
Authored: Mon Oct 8 19:04:02 2018 +0800
Committer: Guanghao Zhang <zg...@apache.org>
Committed: Wed Oct 10 15:43:16 2018 +0800

----------------------------------------------------------------------
 .../master/replication/AddPeerProcedure.java    |  14 ++-
 .../master/replication/ModifyPeerProcedure.java |  10 +-
 .../replication/ReplicationPeerManager.java     |  29 +++++
 .../TestReplicationAdminForSyncReplication.java | 125 +++++++++++++++++++
 4 files changed, 172 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/a1f28f3c/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AddPeerProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AddPeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AddPeerProcedure.java
index 2f2d5a5..f1c07aa 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AddPeerProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AddPeerProcedure.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
 import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
 import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
+import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch;
 import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
 import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
@@ -80,12 +81,23 @@ public class AddPeerProcedure extends ModifyPeerProcedure {
   }
 
   @Override
+  protected void releaseLatch(MasterProcedureEnv env) {
+    if (peerConfig.isSyncReplication()) {
+      env.getReplicationPeerManager().releaseSyncReplicationPeerLock();
+    }
+    ProcedurePrepareLatch.releaseLatch(latch, this);
+  }
+
+  @Override
   protected void prePeerModification(MasterProcedureEnv env)
-      throws IOException, ReplicationException {
+      throws IOException, ReplicationException, InterruptedException {
     MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
     if (cpHost != null) {
       cpHost.preAddReplicationPeer(peerId, peerConfig);
     }
+    if (peerConfig.isSyncReplication()) {
+      env.getReplicationPeerManager().acquireSyncReplicationPeerLock();
+    }
     env.getReplicationPeerManager().preAddPeer(peerId, peerConfig);
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/a1f28f3c/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java
index 1aa86ed..9550fb0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java
@@ -75,7 +75,7 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure<PeerModi
    * all checks passes then the procedure can not be rolled back any more.
    */
   protected abstract void prePeerModification(MasterProcedureEnv env)
-      throws IOException, ReplicationException;
+      throws IOException, ReplicationException, InterruptedException;
 
   protected abstract void updatePeerStorage(MasterProcedureEnv env) throws ReplicationException;
 
@@ -91,7 +91,7 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure<PeerModi
   protected abstract void postPeerModification(MasterProcedureEnv env)
       throws IOException, ReplicationException;
 
-  private void releaseLatch() {
+  protected void releaseLatch(MasterProcedureEnv env) {
     ProcedurePrepareLatch.releaseLatch(latch, this);
   }
 
@@ -241,7 +241,7 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure<PeerModi
 
   @Override
   protected Flow executeFromState(MasterProcedureEnv env, PeerModificationState state)
-      throws ProcedureSuspendedException {
+      throws ProcedureSuspendedException, InterruptedException {
     switch (state) {
       case PRE_PEER_MODIFICATION:
         try {
@@ -250,7 +250,7 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure<PeerModi
           LOG.warn("{} failed to call pre CP hook or the pre check is failed for peer {}, " +
             "mark the procedure as failure and give up", getClass().getName(), peerId, e);
           setFailure("master-" + getPeerOperationType().name().toLowerCase() + "-peer", e);
-          releaseLatch();
+          releaseLatch(env);
           return Flow.NO_MORE_STATE;
         } catch (ReplicationException e) {
           long backoff = ProcedureUtil.getBackoffTimeMs(attempts);
@@ -330,7 +330,7 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure<PeerModi
           LOG.warn("{} failed to call post CP hook for peer {}, " +
             "ignore since the procedure has already done", getClass().getName(), peerId, e);
         }
-        releaseLatch();
+        releaseLatch(env);
         return Flow.NO_MORE_STATE;
       default:
         throw new UnsupportedOperationException("unhandled state=" + state);

http://git-wip-us.apache.org/repos/asf/hbase/blob/a1f28f3c/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
index 8e49137..80338e2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
@@ -28,6 +28,7 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Semaphore;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 import org.apache.commons.lang3.StringUtils;
@@ -75,6 +76,9 @@ public class ReplicationPeerManager {
       SyncReplicationState.DOWNGRADE_ACTIVE,
       EnumSet.of(SyncReplicationState.STANDBY, SyncReplicationState.ACTIVE)));
 
+  // Only allow to add one sync replication peer concurrently
+  private final Semaphore syncReplicationPeerLock = new Semaphore(1);
+
   ReplicationPeerManager(ReplicationPeerStorage peerStorage, ReplicationQueueStorage queueStorage,
       ConcurrentMap<String, ReplicationPeerDescription> peers) {
     this.peerStorage = peerStorage;
@@ -105,6 +109,9 @@ public class ReplicationPeerManager {
       throw new DoNotRetryIOException("Found invalid peer name: " + peerId);
     }
     checkPeerConfig(peerConfig);
+    if (peerConfig.isSyncReplication()) {
+      checkSyncReplicationPeerConfigConflict(peerConfig);
+    }
     if (peers.containsKey(peerId)) {
       throw new DoNotRetryIOException("Replication peer " + peerId + " already exists");
     }
@@ -385,6 +392,7 @@ public class ReplicationPeerManager {
           "Only support replicated table config for sync replication peer");
       }
     }
+
     Path remoteWALDir = new Path(peerConfig.getRemoteWALDir());
     if (!remoteWALDir.isAbsolute()) {
       throw new DoNotRetryIOException(
@@ -397,6 +405,19 @@ public class ReplicationPeerManager {
     }
   }
 
+  private void checkSyncReplicationPeerConfigConflict(ReplicationPeerConfig peerConfig)
+      throws DoNotRetryIOException {
+    for (TableName tableName : peerConfig.getTableCFsMap().keySet()) {
+      for (Map.Entry<String, ReplicationPeerDescription> entry : peers.entrySet()) {
+        ReplicationPeerConfig rpc = entry.getValue().getPeerConfig();
+        if (rpc.isSyncReplication() && rpc.getTableCFsMap().containsKey(tableName)) {
+          throw new DoNotRetryIOException(
+              "Table " + tableName + " has been replicated by peer " + entry.getKey());
+        }
+      }
+    }
+  }
+
   /**
    * Set a namespace in the peer config means that all tables in this namespace will be replicated
    * to the peer cluster.
@@ -493,4 +514,12 @@ public class ReplicationPeerManager {
     }
     return s1.equals(s2);
   }
+
+  public void acquireSyncReplicationPeerLock() throws InterruptedException {
+    syncReplicationPeerLock.acquire();
+  }
+
+  public void releaseSyncReplicationPeerLock() {
+    syncReplicationPeerLock.release();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/a1f28f3c/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminForSyncReplication.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminForSyncReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminForSyncReplication.java
new file mode 100644
index 0000000..31fd53a
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminForSyncReplication.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client.replication;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Category({MediumTests.class, ClientTests.class})
+public class TestReplicationAdminForSyncReplication {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+      HBaseClassTestRule.forClass(TestReplicationAdminForSyncReplication.class);
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestReplicationAdminForSyncReplication.class);
+
+  private final static HBaseTestingUtility TEST_UTIL =
+      new HBaseTestingUtility();
+
+  private static Admin hbaseAdmin;
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
+    TEST_UTIL.startMiniCluster();
+    hbaseAdmin = TEST_UTIL.getAdmin();
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    hbaseAdmin.close();
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  @Test
+  public void testAddPeerWithSameTable() throws Exception {
+    TableName tableName = TableName.valueOf("testAddPeerWithSameTable");
+    TEST_UTIL.createTable(tableName, Bytes.toBytes("family"));
+
+    boolean[] success = { true, true, true, true, true, true };
+    Thread[] threads = new Thread[5];
+    for (int i = 0; i < 5; i++) {
+      String peerId = "id" + i;
+      String clusterKey = "127.0.0.1:2181:/hbase" + i;
+      int index = i;
+      threads[i] = new Thread(() -> {
+        try {
+          hbaseAdmin
+              .addReplicationPeer(peerId, buildSyncReplicationPeerConfig(clusterKey, tableName));
+        } catch (IOException e) {
+          LOG.error("Failed to add replication peer " + peerId);
+          success[index] = false;
+        }
+      });
+    }
+    for (int i = 0; i < 5; i++) {
+      threads[i].start();
+    }
+    for (int i = 0; i < 5; i++) {
+      threads[i].join();
+    }
+
+    int successCount = 0;
+    for (int i = 0; i < 5; i++) {
+      if (success[i]) {
+        successCount++;
+      }
+    }
+    assertEquals("Only one peer can be added successfully", 1, successCount);
+  }
+
+  private ReplicationPeerConfig buildSyncReplicationPeerConfig(String clusterKey,
+      TableName tableName) throws IOException {
+    Path rootDir = TEST_UTIL.getDataTestDirOnTestFS("remoteWAL");
+    ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder();
+    builder.setClusterKey(clusterKey);
+    builder.setRemoteWALDir(rootDir.makeQualified(TEST_UTIL.getTestFileSystem().getUri(),
+        TEST_UTIL.getTestFileSystem().getWorkingDirectory()).toString());
+    builder.setReplicateAllUserTables(false);
+    Map<TableName, List<String>> tableCfs = new HashMap<>();
+    tableCfs.put(tableName, new ArrayList<>());
+    builder.setTableCFsMap(tableCfs);
+    return builder.build();
+  }
+}