You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by bh...@apache.org on 2020/09/10 17:11:30 UTC
[hbase] branch master updated: HBASE-24764: Add support of adding
default peer configs via hbase-site.xml for all replication peers. (#2284)
This is an automated email from the ASF dual-hosted git repository.
bharathv pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/master by this push:
new 9c5dbb2 HBASE-24764: Add support of adding default peer configs via hbase-site.xml for all replication peers. (#2284)
9c5dbb2 is described below
commit 9c5dbb29c0821078cea6089849e535187a7f195d
Author: ankitjain64 <34...@users.noreply.github.com>
AuthorDate: Thu Sep 10 10:08:44 2020 -0700
HBASE-24764: Add support of adding default peer configs via hbase-site.xml for all replication peers. (#2284)
Signed-off-by: Bharath Vissapragada <bh...@apache.org>
---
.../replication/ReplicationPeerConfigUtil.java | 38 ++++++++++
.../replication/TestZKReplicationPeerStorage.java | 47 +++++++++++++
.../master/replication/ReplicationPeerManager.java | 5 ++
.../hbase/replication/TestMasterReplication.java | 80 ++++++++++++++++++++++
4 files changed, 170 insertions(+)
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java
index f569e47..c5dcd76 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder;
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hbase.thirdparty.com.google.common.base.Splitter;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.slf4j.Logger;
@@ -60,6 +61,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
public final class ReplicationPeerConfigUtil {
private static final Logger LOG = LoggerFactory.getLogger(ReplicationPeerConfigUtil.class);
+ public static final String HBASE_REPLICATION_PEER_BASE_CONFIG =
+ "hbase.replication.peer.base.config";
private ReplicationPeerConfigUtil() {}
@@ -450,6 +453,41 @@ public final class ReplicationPeerConfigUtil {
return builder.build();
}
+ /**
+ * Helper method to add base peer configs from Configuration to ReplicationPeerConfig
+ * if not present in latter.
+ *
+ * This merges the user supplied peer configuration
+ * {@link org.apache.hadoop.hbase.replication.ReplicationPeerConfig} with peer configs
+ * provided as property hbase.replication.peer.base.configs in hbase configuration.
+ * Expected format for this hbase configuration is "k1=v1;k2=v2,v2_1". Original value
+ * of conf is retained if already present in ReplicationPeerConfig.
+ *
+ * @param conf Configuration
+ * @return ReplicationPeerConfig containing updated configs.
+ */
+ public static ReplicationPeerConfig addBasePeerConfigsIfNotPresent(Configuration conf,
+ ReplicationPeerConfig receivedPeerConfig) {
+ String basePeerConfigs = conf.get(HBASE_REPLICATION_PEER_BASE_CONFIG, "");
+ ReplicationPeerConfigBuilder copiedPeerConfigBuilder = ReplicationPeerConfig.
+ newBuilder(receivedPeerConfig);
+ Map<String,String> receivedPeerConfigMap = receivedPeerConfig.getConfiguration();
+
+ if (basePeerConfigs.length() != 0) {
+ Map<String, String> basePeerConfigMap = Splitter.on(';').trimResults().omitEmptyStrings()
+ .withKeyValueSeparator("=").split(basePeerConfigs);
+ for (Map.Entry<String,String> entry : basePeerConfigMap.entrySet()) {
+ String configName = entry.getKey();
+ String configValue = entry.getValue();
+ // Only override if base config does not exist in existing peer configs
+ if (!receivedPeerConfigMap.containsKey(configName)) {
+ copiedPeerConfigBuilder.putConfiguration(configName, configValue);
+ }
+ }
+ }
+ return copiedPeerConfigBuilder.build();
+ }
+
public static ReplicationPeerConfig appendExcludeTableCFsToReplicationPeerConfig(
Map<TableName, List<String>> excludeTableCfs, ReplicationPeerConfig peerConfig)
throws ReplicationException {
diff --git a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationPeerStorage.java b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationPeerStorage.java
index 0e7cd74..e7ee1e7 100644
--- a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationPeerStorage.java
+++ b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationPeerStorage.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -34,9 +35,12 @@ import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.stream.Stream;
+
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseZKTestingUtility;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
@@ -215,4 +219,47 @@ public class TestZKReplicationPeerStorage {
assertNotEquals(-1, ZKUtil.checkExists(UTIL.getZooKeeperWatcher(),
STORAGE.getNewSyncReplicationStateNode(peerId)));
}
+
+ @Test
+ public void testBaseReplicationPeerConfig() {
+ String customPeerConfigKey = "hbase.xxx.custom_config";
+ String customPeerConfigValue = "test";
+ String customPeerConfigUpdatedValue = "testUpdated";
+
+ String customPeerConfigSecondKey = "hbase.xxx.custom_second_config";
+ String customPeerConfigSecondValue = "testSecond";
+ String customPeerConfigSecondUpdatedValue = "testSecondUpdated";
+
+ ReplicationPeerConfig existingReplicationPeerConfig = getConfig(1);
+
+ // custom config not present
+ assertEquals(existingReplicationPeerConfig.getConfiguration().get(customPeerConfigKey), null);
+
+ Configuration conf = UTIL.getConfiguration();
+ conf.set(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG,
+ customPeerConfigKey.concat("=").concat(customPeerConfigValue).concat(";").
+ concat(customPeerConfigSecondKey).concat("=").concat(customPeerConfigSecondValue));
+
+ ReplicationPeerConfig updatedReplicationPeerConfig = ReplicationPeerConfigUtil.
+ addBasePeerConfigsIfNotPresent(conf,existingReplicationPeerConfig);
+
+ // validates base configs are present in replicationPeerConfig
+ assertEquals(customPeerConfigValue, updatedReplicationPeerConfig.getConfiguration().
+ get(customPeerConfigKey));
+ assertEquals(customPeerConfigSecondValue, updatedReplicationPeerConfig.getConfiguration().
+ get(customPeerConfigSecondKey));
+
+ // validates base configs does not override value if config already present
+ conf.set(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG,
+ customPeerConfigKey.concat("=").concat(customPeerConfigUpdatedValue).concat(";").
+ concat(customPeerConfigSecondKey).concat("=").concat(customPeerConfigSecondUpdatedValue));
+
+ ReplicationPeerConfig replicationPeerConfigAfterValueUpdate = ReplicationPeerConfigUtil.
+ addBasePeerConfigsIfNotPresent(conf,updatedReplicationPeerConfig);
+
+ assertEquals(customPeerConfigValue, replicationPeerConfigAfterValueUpdate.
+ getConfiguration().get(customPeerConfigKey));
+ assertEquals(customPeerConfigSecondValue, replicationPeerConfigAfterValueUpdate.
+ getConfiguration().get(customPeerConfigSecondKey));
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
index 8a5733f..2c930e1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationException;
@@ -232,6 +233,7 @@ public class ReplicationPeerManager {
// this should be a retry, just return
return;
}
+ peerConfig = ReplicationPeerConfigUtil.addBasePeerConfigsIfNotPresent(conf, peerConfig);
ReplicationPeerConfig copiedPeerConfig = ReplicationPeerConfig.newBuilder(peerConfig).build();
SyncReplicationState syncReplicationState =
copiedPeerConfig.isSyncReplication() ? SyncReplicationState.DOWNGRADE_ACTIVE
@@ -544,6 +546,9 @@ public class ReplicationPeerManager {
ConcurrentMap<String, ReplicationPeerDescription> peers = new ConcurrentHashMap<>();
for (String peerId : peerStorage.listPeerIds()) {
ReplicationPeerConfig peerConfig = peerStorage.getPeerConfig(peerId);
+
+ peerConfig = ReplicationPeerConfigUtil.addBasePeerConfigsIfNotPresent(conf, peerConfig);
+ peerStorage.updatePeerConfig(peerId, peerConfig);
boolean enabled = peerStorage.isPeerEnabled(peerId);
SyncReplicationState state = peerStorage.getPeerSyncReplicationState(peerId);
peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, peerConfig, state));
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
index 7210fc5..b7e5edd 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
@@ -29,6 +29,7 @@ import java.util.List;
import java.util.Optional;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -71,6 +72,7 @@ import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
@@ -441,6 +443,84 @@ public class TestMasterReplication {
}
}
+ /**
+ * Tests that base replication peer configs are applied on peer creation
+ * and the configs are overriden if updated as part of updateReplicationPeerConfig()
+ *
+ */
+ @Test
+ public void testBasePeerConfigsForPeerMutations()
+ throws Exception {
+ LOG.info("testBasePeerConfigsForPeerMutations");
+ String firstCustomPeerConfigKey = "hbase.xxx.custom_config";
+ String firstCustomPeerConfigValue = "test";
+ String firstCustomPeerConfigUpdatedValue = "test_updated";
+
+ String secondCustomPeerConfigKey = "hbase.xxx.custom_second_config";
+ String secondCustomPeerConfigValue = "testSecond";
+ String secondCustomPeerConfigUpdatedValue = "testSecondUpdated";
+ try {
+ baseConfiguration.set(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG,
+ firstCustomPeerConfigKey.concat("=").concat(firstCustomPeerConfigValue));
+ startMiniClusters(2);
+ addPeer("1", 0, 1);
+ addPeer("2", 0, 1);
+ Admin admin = utilities[0].getAdmin();
+
+ // Validates base configs 1 is present for both peer.
+ Assert.assertEquals(firstCustomPeerConfigValue, admin.getReplicationPeerConfig("1").
+ getConfiguration().get(firstCustomPeerConfigKey));
+ Assert.assertEquals(firstCustomPeerConfigValue, admin.getReplicationPeerConfig("2").
+ getConfiguration().get(firstCustomPeerConfigKey));
+
+ // override value of configuration 1 for peer "1".
+ ReplicationPeerConfig updatedReplicationConfigForPeer1 = ReplicationPeerConfig.
+ newBuilder(admin.getReplicationPeerConfig("1")).
+ putConfiguration(firstCustomPeerConfigKey, firstCustomPeerConfigUpdatedValue).build();
+
+ // add configuration 2 for peer "2".
+ ReplicationPeerConfig updatedReplicationConfigForPeer2 = ReplicationPeerConfig.
+ newBuilder(admin.getReplicationPeerConfig("2")).
+ putConfiguration(secondCustomPeerConfigKey, secondCustomPeerConfigUpdatedValue).build();
+
+ admin.updateReplicationPeerConfig("1", updatedReplicationConfigForPeer1);
+ admin.updateReplicationPeerConfig("2", updatedReplicationConfigForPeer2);
+
+ // validates configuration is overridden by updateReplicationPeerConfig
+ Assert.assertEquals(firstCustomPeerConfigUpdatedValue, admin.getReplicationPeerConfig("1").
+ getConfiguration().get(firstCustomPeerConfigKey));
+ Assert.assertEquals(secondCustomPeerConfigUpdatedValue, admin.getReplicationPeerConfig("2").
+ getConfiguration().get(secondCustomPeerConfigKey));
+
+ // Add second config to base config and perform restart.
+ utilities[0].getConfiguration().set(ReplicationPeerConfigUtil.
+ HBASE_REPLICATION_PEER_BASE_CONFIG, firstCustomPeerConfigKey.concat("=").
+ concat(firstCustomPeerConfigValue).concat(";").concat(secondCustomPeerConfigKey)
+ .concat("=").concat(secondCustomPeerConfigValue));
+
+ utilities[0].shutdownMiniHBaseCluster();
+ utilities[0].restartHBaseCluster(1);
+ admin = utilities[0].getAdmin();
+
+ // Both retains the value of base configuration 1 value as before restart.
+ // Peer 1 (Update value), Peer 2 (Base Value)
+ Assert.assertEquals(firstCustomPeerConfigUpdatedValue, admin.getReplicationPeerConfig("1").
+ getConfiguration().get(firstCustomPeerConfigKey));
+ Assert.assertEquals(firstCustomPeerConfigValue, admin.getReplicationPeerConfig("2").
+ getConfiguration().get(firstCustomPeerConfigKey));
+
+ // Peer 1 gets new base config as part of restart.
+ Assert.assertEquals(secondCustomPeerConfigValue, admin.getReplicationPeerConfig("1").
+ getConfiguration().get(secondCustomPeerConfigKey));
+ // Peer 2 retains the updated value as before restart.
+ Assert.assertEquals(secondCustomPeerConfigUpdatedValue, admin.getReplicationPeerConfig("2").
+ getConfiguration().get(secondCustomPeerConfigKey));
+ } finally {
+ shutDownMiniClusters();
+ baseConfiguration.unset(ReplicationPeerConfigUtil.HBASE_REPLICATION_PEER_BASE_CONFIG);
+ }
+ }
+
@After
public void tearDown() throws IOException {
configurations = null;