You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2019/02/08 11:49:02 UTC

[hbase] branch branch-2.1 updated: HBASE-21857 Do not need to check clusterKey if replicationEndpoint is provided when adding a peer

This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 16f27d7  HBASE-21857 Do not need to check clusterKey if replicationEndpoint is provided when adding a peer
16f27d7 is described below

commit 16f27d7699994b5c6494cb2a51071ca2d6dfa38c
Author: zhangduo <zh...@apache.org>
AuthorDate: Fri Feb 8 09:56:52 2019 +0800

    HBASE-21857 Do not need to check clusterKey if replicationEndpoint is provided when adding a peer
    
    Signed-off-by: Xu Cang <xu...@apache.org>
---
 .../replication/ReplicationPeerConfigUtil.java     |   6 +-
 .../src/main/protobuf/Replication.proto            |   2 +-
 .../master/replication/ReplicationPeerManager.java |  24 ++++-
 .../hbase/client/TestAsyncReplicationAdminApi.java | 109 +++++++++++++++++++--
 4 files changed, 126 insertions(+), 15 deletions(-)

diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java
index a1e7d6c..80b4514 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java
@@ -325,9 +325,9 @@ public final class ReplicationPeerConfigUtil {
   public static ReplicationProtos.ReplicationPeer convert(ReplicationPeerConfig peerConfig) {
     ReplicationProtos.ReplicationPeer.Builder builder =
         ReplicationProtos.ReplicationPeer.newBuilder();
-    if (peerConfig.getClusterKey() != null) {
-      builder.setClusterkey(peerConfig.getClusterKey());
-    }
+    // we used to set cluster key as required so here we must always set it, until we can make sure
+    // that no one uses the old proto file.
+    builder.setClusterkey(peerConfig.getClusterKey() != null ? peerConfig.getClusterKey() : "");
     if (peerConfig.getReplicationEndpointImpl() != null) {
       builder.setReplicationEndpointImpl(peerConfig.getReplicationEndpointImpl());
     }
diff --git a/hbase-protocol-shaded/src/main/protobuf/Replication.proto b/hbase-protocol-shaded/src/main/protobuf/Replication.proto
index 557b87c..1d483ce 100644
--- a/hbase-protocol-shaded/src/main/protobuf/Replication.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/Replication.proto
@@ -38,7 +38,7 @@ message TableCF {
 message ReplicationPeer {
   // clusterkey is the concatenation of the slave cluster's
   // hbase.zookeeper.quorum:hbase.zookeeper.property.clientPort:zookeeper.znode.parent
-  required string clusterkey = 1;
+  optional string clusterkey = 1;
   optional string replicationEndpointImpl = 2;
   repeated BytesBytesPair data = 3;
   repeated NameStringPair configuration = 4;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
index 87d0111..af00aae 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint;
+import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
 import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
 import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder;
@@ -43,6 +44,7 @@ import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
 import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
 import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
 import org.apache.hadoop.hbase.replication.ReplicationUtils;
+import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
 import org.apache.hadoop.hbase.zookeeper.ZKConfig;
 import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
 import org.apache.yetus.audience.InterfaceAudience;
@@ -242,7 +244,27 @@ public class ReplicationPeerManager {
   }
 
   private void checkPeerConfig(ReplicationPeerConfig peerConfig) throws DoNotRetryIOException {
-    checkClusterKey(peerConfig.getClusterKey());
+    String replicationEndpointImpl = peerConfig.getReplicationEndpointImpl();
+    boolean checkClusterKey = true;
+    if (!StringUtils.isBlank(replicationEndpointImpl)) {
+      // try creating a instance
+      ReplicationEndpoint endpoint;
+      try {
+        endpoint = Class.forName(replicationEndpointImpl)
+          .asSubclass(ReplicationEndpoint.class).getDeclaredConstructor().newInstance();
+      } catch (Exception e) {
+        throw new DoNotRetryIOException(
+          "Can not instantiate configured replication endpoint class=" + replicationEndpointImpl,
+          e);
+      }
+      // do not check cluster key if we are not HBaseInterClusterReplicationEndpoint
+      if (!(endpoint instanceof HBaseInterClusterReplicationEndpoint)) {
+        checkClusterKey = false;
+      }
+    }
+    if (checkClusterKey) {
+      checkClusterKey(peerConfig.getClusterKey());
+    }
 
     if (peerConfig.replicateAllUserTables()) {
       // If replicate_all flag is true, it means all user tables will be replicated to peer cluster.
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApi.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApi.java
index b5a50c0..e33fd21 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApi.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApi.java
@@ -18,10 +18,13 @@
 package org.apache.hadoop.hbase.client;
 
 import static org.apache.hadoop.hbase.client.AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.startsWith;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -32,16 +35,25 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint;
+import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
 import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
 import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
 import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
 import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
+import org.apache.hadoop.hbase.replication.WALEntryFilter;
+import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
 import org.apache.hadoop.hbase.testclassification.ClientTests;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.junit.After;
@@ -56,12 +68,12 @@ import org.junit.runners.Parameterized;
  * Class to test asynchronous replication admin operations.
  */
 @RunWith(Parameterized.class)
-@Category({LargeTests.class, ClientTests.class})
+@Category({ LargeTests.class, ClientTests.class })
 public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase {
 
   @ClassRule
   public static final HBaseClassTestRule CLASS_RULE =
-      HBaseClassTestRule.forClass(TestAsyncReplicationAdminApi.class);
+    HBaseClassTestRule.forClass(TestAsyncReplicationAdminApi.class);
 
   private final String ID_ONE = "1";
   private final String KEY_ONE = "127.0.0.1:2181:/hbase";
@@ -89,7 +101,7 @@ public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase {
     } catch (Exception e) {
     }
     ReplicationQueueStorage queueStorage = ReplicationStorageFactory
-        .getReplicationQueueStorage(TEST_UTIL.getZooKeeperWatcher(), TEST_UTIL.getConfiguration());
+      .getReplicationQueueStorage(TEST_UTIL.getZooKeeperWatcher(), TEST_UTIL.getConfiguration());
     for (ServerName serverName : queueStorage.getListOfReplicators()) {
       for (String queue : queueStorage.getAllQueues(serverName)) {
         queueStorage.removeQueue(serverName, queue);
@@ -186,8 +198,8 @@ public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase {
     // append table t1 to replication
     tableCFs.put(tableName1, null);
     admin.appendReplicationPeerTableCFs(ID_ONE, tableCFs).join();
-    Map<TableName, List<String>> result = admin.getReplicationPeerConfig(ID_ONE).get()
-        .getTableCFsMap();
+    Map<TableName, List<String>> result =
+      admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap();
     assertEquals(1, result.size());
     assertEquals(true, result.containsKey(tableName1));
     assertNull(result.get(tableName1));
@@ -301,12 +313,13 @@ public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase {
       tableCFs.clear();
       tableCFs.put(tableName3, null);
       admin.removeReplicationPeerTableCFs(ID_ONE, tableCFs).join();
-      fail("Test case should fail as removing table-cfs from a peer whose table-cfs didn't contain t3");
+      fail("Test case should fail as removing table-cfs from a peer whose" +
+        " table-cfs didn't contain t3");
     } catch (CompletionException e) {
       assertTrue(e.getCause() instanceof ReplicationException);
     }
-    Map<TableName, List<String>> result = admin.getReplicationPeerConfig(ID_ONE).get()
-        .getTableCFsMap();
+    Map<TableName, List<String>> result =
+      admin.getReplicationPeerConfig(ID_ONE).get().getTableCFsMap();
     assertEquals(2, result.size());
     assertTrue("Should contain t1", result.containsKey(tableName1));
     assertTrue("Should contain t2", result.containsKey(tableName2));
@@ -414,7 +427,8 @@ public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase {
     rpc.setTableCFsMap(tableCfs);
     try {
       admin.updateReplicationPeerConfig(ID_ONE, rpc).join();
-      fail("Test case should fail, because table " + tableName1 + " conflict with namespace " + ns1);
+      fail(
+        "Test case should fail, because table " + tableName1 + " conflict with namespace " + ns1);
     } catch (CompletionException e) {
       // OK
     }
@@ -430,7 +444,8 @@ public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase {
     rpc.setNamespaces(namespaces);
     try {
       admin.updateReplicationPeerConfig(ID_ONE, rpc).join();
-      fail("Test case should fail, because namespace " + ns2 + " conflict with table " + tableName2);
+      fail(
+        "Test case should fail, because namespace " + ns2 + " conflict with table " + tableName2);
     } catch (CompletionException e) {
       // OK
     }
@@ -453,4 +468,78 @@ public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase {
 
     admin.removeReplicationPeer(ID_ONE).join();
   }
+
+  @Test
+  public void testInvalidClusterKey() throws InterruptedException {
+    try {
+      admin.addReplicationPeer(ID_ONE,
+        ReplicationPeerConfig.newBuilder().setClusterKey("whatever").build()).get();
+      fail();
+    } catch (ExecutionException e) {
+      assertThat(e.getCause(), instanceOf(DoNotRetryIOException.class));
+    }
+  }
+
+  @Test
+  public void testInvalidReplicationEndpoint() throws InterruptedException {
+    try {
+      admin.addReplicationPeer(ID_ONE,
+        ReplicationPeerConfig.newBuilder().setReplicationEndpointImpl("whatever").build()).get();
+      fail();
+    } catch (ExecutionException e) {
+      assertThat(e.getCause(), instanceOf(DoNotRetryIOException.class));
+      assertThat(e.getCause().getMessage(), startsWith("Can not instantiate"));
+    }
+  }
+
+  public static final class DummyReplicationEndpoint extends BaseReplicationEndpoint {
+
+    @Override
+    public UUID getPeerUUID() {
+      return ctx.getClusterId();
+    }
+
+    @Override
+    public boolean replicate(ReplicateContext replicateContext) {
+      return true;
+    }
+
+    @Override
+    public void start() {
+      startAsync();
+    }
+
+    @Override
+    public void stop() {
+      stopAsync();
+    }
+
+    @Override
+    protected void doStart() {
+      notifyStarted();
+    }
+
+    @Override
+    protected void doStop() {
+      notifyStopped();
+    }
+  }
+
+  @Test
+  public void testSetReplicationEndpoint() throws InterruptedException, ExecutionException {
+    // make sure that we do not need to set cluster key when we use customized ReplicationEndpoint
+    admin.addReplicationPeer(ID_ONE, ReplicationPeerConfig.newBuilder()
+      .setReplicationEndpointImpl(DummyReplicationEndpoint.class.getName()).build()).get();
+
+    // but we still need to check cluster key if we specify the default ReplicationEndpoint
+    try {
+      admin
+        .addReplicationPeer(ID_TWO, ReplicationPeerConfig.newBuilder()
+          .setReplicationEndpointImpl(HBaseInterClusterReplicationEndpoint.class.getName()).build())
+        .get();
+      fail();
+    } catch (ExecutionException e) {
+      assertThat(e.getCause(), instanceOf(DoNotRetryIOException.class));
+    }
+  }
 }