You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by bh...@apache.org on 2020/09/10 17:11:30 UTC

[hbase] branch master updated: HBASE-24764: Add support of adding default peer configs via hbase-site.xml for all replication peers. (#2284)

This is an automated email from the ASF dual-hosted git repository.

bharathv pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/master by this push:
     new 9c5dbb2  HBASE-24764: Add support of adding default peer configs via hbase-site.xml for all replication peers. (#2284)
9c5dbb2 is described below

commit 9c5dbb29c0821078cea6089849e535187a7f195d
Author: ankitjain64 <34...@users.noreply.github.com>
AuthorDate: Thu Sep 10 10:08:44 2020 -0700

    HBASE-24764: Add support of adding default peer configs via hbase-site.xml for all replication peers. (#2284)
    
    Signed-off-by: Bharath Vissapragada <bh...@apache.org>
---
 .../replication/ReplicationPeerConfigUtil.java     | 38 ++++++++++
 .../replication/TestZKReplicationPeerStorage.java  | 47 +++++++++++++
 .../master/replication/ReplicationPeerManager.java |  5 ++
 .../hbase/replication/TestMasterReplication.java   | 80 ++++++++++++++++++++++
 4 files changed, 170 insertions(+)

diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java
index f569e47..c5dcd76 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder;
 import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
 import org.apache.hadoop.hbase.replication.SyncReplicationState;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hbase.thirdparty.com.google.common.base.Splitter;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.yetus.audience.InterfaceStability;
 import org.slf4j.Logger;
@@ -60,6 +61,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
 public final class ReplicationPeerConfigUtil {
 
   private static final Logger LOG = LoggerFactory.getLogger(ReplicationPeerConfigUtil.class);
+  public static final String HBASE_REPLICATION_PEER_BASE_CONFIG =
+    "hbase.replication.peer.base.config";
 
   private ReplicationPeerConfigUtil() {}
 
@@ -450,6 +453,41 @@ public final class ReplicationPeerConfigUtil {
     return builder.build();
   }
 
+  /**
+   * Helper method to add base peer configs from Configuration to ReplicationPeerConfig
+   * if not present in latter.
+   *
+   * This merges the user supplied peer configuration
+   * {@link org.apache.hadoop.hbase.replication.ReplicationPeerConfig} with peer configs
+   * provided as property hbase.replication.peer.base.configs in hbase configuration.
+   * Expected format for this hbase configuration is "k1=v1;k2=v2,v2_1". Original value
+   * of conf is retained if already present in ReplicationPeerConfig.
+   *
+   * @param conf Configuration
+   * @return ReplicationPeerConfig containing updated configs.
+   */
+  public static ReplicationPeerConfig addBasePeerConfigsIfNotPresent(Configuration conf,
+    ReplicationPeerConfig receivedPeerConfig) {
+    String basePeerConfigs = conf.get(HBASE_REPLICATION_PEER_BASE_CONFIG, "");
+    ReplicationPeerConfigBuilder copiedPeerConfigBuilder = ReplicationPeerConfig.
+      newBuilder(receivedPeerConfig);
+    Map<String,String> receivedPeerConfigMap = receivedPeerConfig.getConfiguration();
+
+    if (basePeerConfigs.length() != 0) {
+      Map<String, String> basePeerConfigMap = Splitter.on(';').trimResults().omitEmptyStrings()
+        .withKeyValueSeparator("=").split(basePeerConfigs);
+      for (Map.Entry<String,String> entry : basePeerConfigMap.entrySet()) {
+        String configName = entry.getKey();
+        String configValue = entry.getValue();
+        // Only override if base config does not exist in existing peer configs
+        if (!receivedPeerConfigMap.containsKey(configName)) {
+          copiedPeerConfigBuilder.putConfiguration(configName, configValue);
+        }
+      }
+    }
+    return copiedPeerConfigBuilder.build();
+  }
+
   public static ReplicationPeerConfig appendExcludeTableCFsToReplicationPeerConfig(
       Map<TableName, List<String>> excludeTableCfs, ReplicationPeerConfig peerConfig)
       throws ReplicationException {
diff --git a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationPeerStorage.java b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationPeerStorage.java
index 0e7cd74..e7ee1e7 100644
--- a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationPeerStorage.java
+++ b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationPeerStorage.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -34,9 +35,12 @@ import java.util.Map;
 import java.util.Random;
 import java.util.Set;
 import java.util.stream.Stream;
+
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseZKTestingUtility;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.ReplicationTests;
 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
@@ -215,4 +219,47 @@ public class TestZKReplicationPeerStorage {
     assertNotEquals(-1, ZKUtil.checkExists(UTIL.getZooKeeperWatcher(),
       STORAGE.getNewSyncReplicationStateNode(peerId)));
   }
+
+  @Test
+  public void testBaseReplicationPeerConfig() {
+    String customPeerConfigKey = "hbase.xxx.custom_config";
+    String customPeerConfigValue = "test";
+    String customPeerConfigUpdatedValue = "testUpdated";
+
+    String customPeerConfigSecondKey = "hbase.xxx.custom_second_config";
+    String customPeerConfigSecondValue = "testSecond";
+    String customPeerConfigSecondUpdatedValue = "testSecondUpdated";
+
+    ReplicationPeerConfig existingReplicationPeerConfig = getConfig(1);
+
+    // custom config not present
+    assertEquals(existingReplicationPeerConfig.getConfiguration().get(customPeerConfigKey), null);
+
+    Configuration conf = UTIL.getConfiguration();
+    conf.set(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG,
+      customPeerConfigKey.concat("=").concat(customPeerConfigValue).concat(";").
+        concat(customPeerConfigSecondKey).concat("=").concat(customPeerConfigSecondValue));
+
+    ReplicationPeerConfig updatedReplicationPeerConfig = ReplicationPeerConfigUtil.
+      addBasePeerConfigsIfNotPresent(conf,existingReplicationPeerConfig);
+
+    // validates base configs are present in replicationPeerConfig
+    assertEquals(customPeerConfigValue, updatedReplicationPeerConfig.getConfiguration().
+      get(customPeerConfigKey));
+    assertEquals(customPeerConfigSecondValue, updatedReplicationPeerConfig.getConfiguration().
+      get(customPeerConfigSecondKey));
+
+    // validates base configs does not override value if config already present
+    conf.set(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG,
+      customPeerConfigKey.concat("=").concat(customPeerConfigUpdatedValue).concat(";").
+        concat(customPeerConfigSecondKey).concat("=").concat(customPeerConfigSecondUpdatedValue));
+
+    ReplicationPeerConfig replicationPeerConfigAfterValueUpdate = ReplicationPeerConfigUtil.
+      addBasePeerConfigsIfNotPresent(conf,updatedReplicationPeerConfig);
+
+    assertEquals(customPeerConfigValue, replicationPeerConfigAfterValueUpdate.
+      getConfiguration().get(customPeerConfigKey));
+    assertEquals(customPeerConfigSecondValue, replicationPeerConfigAfterValueUpdate.
+      getConfiguration().get(customPeerConfigSecondKey));
+  }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
index 8a5733f..2c930e1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
 import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint;
 import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
 import org.apache.hadoop.hbase.replication.ReplicationException;
@@ -232,6 +233,7 @@ public class ReplicationPeerManager {
       // this should be a retry, just return
       return;
     }
+    peerConfig = ReplicationPeerConfigUtil.addBasePeerConfigsIfNotPresent(conf, peerConfig);
     ReplicationPeerConfig copiedPeerConfig = ReplicationPeerConfig.newBuilder(peerConfig).build();
     SyncReplicationState syncReplicationState =
       copiedPeerConfig.isSyncReplication() ? SyncReplicationState.DOWNGRADE_ACTIVE
@@ -544,6 +546,9 @@ public class ReplicationPeerManager {
     ConcurrentMap<String, ReplicationPeerDescription> peers = new ConcurrentHashMap<>();
     for (String peerId : peerStorage.listPeerIds()) {
       ReplicationPeerConfig peerConfig = peerStorage.getPeerConfig(peerId);
+
+      peerConfig = ReplicationPeerConfigUtil.addBasePeerConfigsIfNotPresent(conf, peerConfig);
+      peerStorage.updatePeerConfig(peerId, peerConfig);
       boolean enabled = peerStorage.isPeerEnabled(peerId);
       SyncReplicationState state = peerStorage.getPeerSyncReplicationState(peerId);
       peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, peerConfig, state));
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
index 7210fc5..b7e5edd 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
@@ -29,6 +29,7 @@ import java.util.List;
 import java.util.Optional;
 import java.util.Random;
 import java.util.concurrent.CountDownLatch;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -71,6 +72,7 @@ import org.apache.hadoop.hbase.wal.WALEdit;
 import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
 import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.ClassRule;
 import org.junit.Test;
@@ -441,6 +443,84 @@ public class TestMasterReplication {
     }
   }
 
+  /**
+   * Tests that base replication peer configs are applied on peer creation
+   * and the configs are overriden if updated as part of updateReplicationPeerConfig()
+   *
+   */
+  @Test
+  public void testBasePeerConfigsForPeerMutations()
+    throws Exception {
+    LOG.info("testBasePeerConfigsForPeerMutations");
+    String firstCustomPeerConfigKey = "hbase.xxx.custom_config";
+    String firstCustomPeerConfigValue = "test";
+    String firstCustomPeerConfigUpdatedValue = "test_updated";
+
+    String secondCustomPeerConfigKey = "hbase.xxx.custom_second_config";
+    String secondCustomPeerConfigValue = "testSecond";
+    String secondCustomPeerConfigUpdatedValue = "testSecondUpdated";
+    try {
+      baseConfiguration.set(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG,
+        firstCustomPeerConfigKey.concat("=").concat(firstCustomPeerConfigValue));
+      startMiniClusters(2);
+      addPeer("1", 0, 1);
+      addPeer("2", 0, 1);
+      Admin admin = utilities[0].getAdmin();
+
+      // Validates base configs 1 is present for both peer.
+      Assert.assertEquals(firstCustomPeerConfigValue, admin.getReplicationPeerConfig("1").
+        getConfiguration().get(firstCustomPeerConfigKey));
+      Assert.assertEquals(firstCustomPeerConfigValue, admin.getReplicationPeerConfig("2").
+        getConfiguration().get(firstCustomPeerConfigKey));
+
+      // override value of configuration 1 for peer "1".
+      ReplicationPeerConfig updatedReplicationConfigForPeer1 = ReplicationPeerConfig.
+        newBuilder(admin.getReplicationPeerConfig("1")).
+        putConfiguration(firstCustomPeerConfigKey, firstCustomPeerConfigUpdatedValue).build();
+
+      // add configuration 2 for peer "2".
+      ReplicationPeerConfig updatedReplicationConfigForPeer2 = ReplicationPeerConfig.
+        newBuilder(admin.getReplicationPeerConfig("2")).
+        putConfiguration(secondCustomPeerConfigKey, secondCustomPeerConfigUpdatedValue).build();
+
+      admin.updateReplicationPeerConfig("1", updatedReplicationConfigForPeer1);
+      admin.updateReplicationPeerConfig("2", updatedReplicationConfigForPeer2);
+
+      // validates configuration is overridden by updateReplicationPeerConfig
+      Assert.assertEquals(firstCustomPeerConfigUpdatedValue, admin.getReplicationPeerConfig("1").
+        getConfiguration().get(firstCustomPeerConfigKey));
+      Assert.assertEquals(secondCustomPeerConfigUpdatedValue, admin.getReplicationPeerConfig("2").
+        getConfiguration().get(secondCustomPeerConfigKey));
+
+      // Add second config to base config and perform restart.
+      utilities[0].getConfiguration().set(ReplicationPeerConfigUtil.
+        HBASE_REPLICATION_PEER_BASE_CONFIG, firstCustomPeerConfigKey.concat("=").
+        concat(firstCustomPeerConfigValue).concat(";").concat(secondCustomPeerConfigKey)
+        .concat("=").concat(secondCustomPeerConfigValue));
+
+      utilities[0].shutdownMiniHBaseCluster();
+      utilities[0].restartHBaseCluster(1);
+      admin = utilities[0].getAdmin();
+
+      // Both retains the value of base configuration 1 value as before restart.
+      // Peer 1 (Update value), Peer 2 (Base Value)
+      Assert.assertEquals(firstCustomPeerConfigUpdatedValue, admin.getReplicationPeerConfig("1").
+        getConfiguration().get(firstCustomPeerConfigKey));
+      Assert.assertEquals(firstCustomPeerConfigValue, admin.getReplicationPeerConfig("2").
+        getConfiguration().get(firstCustomPeerConfigKey));
+
+      // Peer 1 gets new base config as part of restart.
+      Assert.assertEquals(secondCustomPeerConfigValue, admin.getReplicationPeerConfig("1").
+        getConfiguration().get(secondCustomPeerConfigKey));
+      // Peer 2 retains the updated value as before restart.
+      Assert.assertEquals(secondCustomPeerConfigUpdatedValue, admin.getReplicationPeerConfig("2").
+        getConfiguration().get(secondCustomPeerConfigKey));
+    } finally {
+      shutDownMiniClusters();
+      baseConfiguration.unset(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG);
+    }
+  }
+
   @After
   public void tearDown() throws IOException {
     configurations = null;