You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zg...@apache.org on 2017/11/23 07:07:11 UTC
hbase git commit: HBASE-16868 Add a replicate_all flag to avoid
misuse the namespaces and table-cfs config of replication peer
Repository: hbase
Updated Branches:
refs/heads/master 2442cbb6a -> 3e2941a49
HBASE-16868 Add a replicate_all flag to avoid misuse the namespaces and table-cfs config of replication peer
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/3e2941a4
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/3e2941a4
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/3e2941a4
Branch: refs/heads/master
Commit: 3e2941a49e58080618fd2d2e6757694c96651e0a
Parents: 2442cbb
Author: Guanghao Zhang <zg...@apache.org>
Authored: Thu Nov 23 14:54:19 2017 +0800
Committer: Guanghao Zhang <zg...@apache.org>
Committed: Thu Nov 23 14:54:19 2017 +0800
----------------------------------------------------------------------
.../replication/ReplicationPeerConfigUtil.java | 10 +-
.../replication/ReplicationPeerConfig.java | 12 ++
.../src/main/protobuf/Replication.proto | 1 +
.../replication/ReplicationPeersZKImpl.java | 1 +
.../org/apache/hadoop/hbase/master/HMaster.java | 6 +-
.../master/replication/ReplicationManager.java | 25 ++-
.../NamespaceTableCfWALEntryFilter.java | 103 ++++++-----
.../master/ReplicationPeerConfigUpgrader.java | 184 +++++++++++++++++++
.../replication/master/TableCFsUpdater.java | 151 ---------------
.../client/TestAsyncReplicationAdminApi.java | 9 +
...estAsyncReplicationAdminApiWithClusters.java | 2 +
.../replication/TestReplicationAdmin.java | 116 ++++++++++--
.../TestReplicationAdminWithClusters.java | 12 +-
.../replication/TestMasterReplication.java | 8 +-
.../replication/TestNamespaceReplication.java | 6 +-
.../replication/TestPerTableCFReplication.java | 4 +-
.../TestReplicationWALEntryFilters.java | 69 +++++--
.../replication/master/TestTableCFsUpdater.java | 4 +-
.../src/main/ruby/hbase/replication_admin.rb | 9 +
hbase-shell/src/main/ruby/shell.rb | 1 +
.../src/main/ruby/shell/commands/list_peers.rb | 5 +-
.../shell/commands/set_peer_replicate_all.rb | 54 ++++++
.../test/ruby/hbase/replication_admin_test.rb | 95 +++++++---
23 files changed, 602 insertions(+), 285 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/3e2941a4/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java
index be468ae..52a3c93 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java
@@ -286,6 +286,7 @@ public final class ReplicationPeerConfigUtil {
if (tableCFsMap != null) {
peerConfig.setTableCFsMap(tableCFsMap);
}
+
List<ByteString> namespacesList = peer.getNamespacesList();
if (namespacesList != null && namespacesList.size() != 0) {
Set<String> namespaces = new HashSet<>();
@@ -294,9 +295,15 @@ public final class ReplicationPeerConfigUtil {
}
peerConfig.setNamespaces(namespaces);
}
+
if (peer.hasBandwidth()) {
peerConfig.setBandwidth(peer.getBandwidth());
}
+
+ if (peer.hasReplicateAll()) {
+ peerConfig.setReplicateAllUserTables(peer.getReplicateAll());
+ }
+
return peerConfig;
}
@@ -338,6 +345,7 @@ public final class ReplicationPeerConfigUtil {
}
builder.setBandwidth(peerConfig.getBandwidth());
+ builder.setReplicateAll(peerConfig.replicateAllUserTables());
return builder.build();
}
@@ -465,4 +473,4 @@ public final class ReplicationPeerConfigUtil {
return otherConf;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/3e2941a4/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
index 4d429c9..9e20829 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
@@ -42,6 +42,8 @@ public class ReplicationPeerConfig {
private Map<TableName, ? extends Collection<String>> tableCFsMap = null;
private Set<String> namespaces = null;
private long bandwidth = 0;
+ // Default value is true, means replicate all user tables to peer cluster.
+ private boolean replicateAllUserTables = true;
public ReplicationPeerConfig() {
this.peerData = new TreeMap<>(Bytes.BYTES_COMPARATOR);
@@ -110,10 +112,20 @@ public class ReplicationPeerConfig {
return this;
}
+ public boolean replicateAllUserTables() {
+ return this.replicateAllUserTables;
+ }
+
+ public ReplicationPeerConfig setReplicateAllUserTables(boolean replicateAllUserTables) {
+ this.replicateAllUserTables = replicateAllUserTables;
+ return this;
+ }
+
@Override
public String toString() {
StringBuilder builder = new StringBuilder("clusterKey=").append(clusterKey).append(",");
builder.append("replicationEndpointImpl=").append(replicationEndpointImpl).append(",");
+ builder.append("replicateAllUserTables=").append(replicateAllUserTables).append(",");
if (namespaces != null) {
builder.append("namespaces=").append(namespaces.toString()).append(",");
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/3e2941a4/hbase-protocol-shaded/src/main/protobuf/Replication.proto
----------------------------------------------------------------------
diff --git a/hbase-protocol-shaded/src/main/protobuf/Replication.proto b/hbase-protocol-shaded/src/main/protobuf/Replication.proto
index 88efa00..a1a7ade 100644
--- a/hbase-protocol-shaded/src/main/protobuf/Replication.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/Replication.proto
@@ -45,6 +45,7 @@ message ReplicationPeer {
repeated TableCF table_cfs = 5;
repeated bytes namespaces = 6;
optional int64 bandwidth = 7;
+ optional bool replicate_all = 8;
}
/**
http://git-wip-us.apache.org/repos/asf/hbase/blob/3e2941a4/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
index b7564f4..2c3bbd5 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
@@ -368,6 +368,7 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
existingConfig.setTableCFsMap(newConfig.getTableCFsMap());
existingConfig.setNamespaces(newConfig.getNamespaces());
existingConfig.setBandwidth(newConfig.getBandwidth());
+ existingConfig.setReplicateAllUserTables(newConfig.replicateAllUserTables());
try {
ZKUtil.setData(this.zookeeper, getPeerNode(id),
http://git-wip-us.apache.org/repos/asf/hbase/blob/3e2941a4/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index cfbddfc..cde3581 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -156,7 +156,7 @@ import org.apache.hadoop.hbase.replication.ReplicationFactory;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
import org.apache.hadoop.hbase.replication.ReplicationQueuesZKImpl;
-import org.apache.hadoop.hbase.replication.master.TableCFsUpdater;
+import org.apache.hadoop.hbase.replication.master.ReplicationPeerConfigUpgrader;
import org.apache.hadoop.hbase.replication.regionserver.Replication;
import org.apache.hadoop.hbase.security.AccessDeniedException;
import org.apache.hadoop.hbase.security.UserProvider;
@@ -798,9 +798,9 @@ public class HMaster extends HRegionServer implements MasterServices {
// This is for backwards compatibility
// See HBASE-11393
status.setStatus("Update TableCFs node in ZNode");
- TableCFsUpdater tableCFsUpdater = new TableCFsUpdater(zooKeeper,
+ ReplicationPeerConfigUpgrader tableCFsUpdater = new ReplicationPeerConfigUpgrader(zooKeeper,
conf, this.clusterConnection);
- tableCFsUpdater.update();
+ tableCFsUpdater.copyTableCFs();
// Add the Observer to delete space quotas on table deletion before starting all CPs by
// default with quota support, avoiding if user specifically asks to not load this Observer.
http://git-wip-us.apache.org/repos/asf/hbase/blob/3e2941a4/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationManager.java
index 3615992..f2a6c85 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationManager.java
@@ -71,9 +71,7 @@ public class ReplicationManager {
public void addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled)
throws ReplicationException, IOException {
- checkNamespacesAndTableCfsConfigConflict(peerConfig.getNamespaces(),
- peerConfig.getTableCFsMap());
- checkConfiguredWALEntryFilters(peerConfig);
+ checkPeerConfig(peerConfig);
replicationPeers.registerPeer(peerId, peerConfig, enabled);
replicationPeers.peerConnected(peerId);
}
@@ -102,9 +100,7 @@ public class ReplicationManager {
public void updatePeerConfig(String peerId, ReplicationPeerConfig peerConfig)
throws ReplicationException, IOException {
- checkNamespacesAndTableCfsConfigConflict(peerConfig.getNamespaces(),
- peerConfig.getTableCFsMap());
- checkConfiguredWALEntryFilters(peerConfig);
+ checkPeerConfig(peerConfig);
this.replicationPeers.updatePeerConfig(peerId, peerConfig);
}
@@ -122,6 +118,21 @@ public class ReplicationManager {
return peers;
}
+ private void checkPeerConfig(ReplicationPeerConfig peerConfig) throws ReplicationException,
+ IOException {
+ if (peerConfig.replicateAllUserTables()) {
+ if ((peerConfig.getNamespaces() != null && !peerConfig.getNamespaces().isEmpty())
+ || (peerConfig.getTableCFsMap() != null && !peerConfig.getTableCFsMap().isEmpty())) {
+ throw new ReplicationException(
+ "Need clean namespaces or table-cfs config fisrtly when you want replicate all cluster");
+ }
+ } else {
+ checkNamespacesAndTableCfsConfigConflict(peerConfig.getNamespaces(),
+ peerConfig.getTableCFsMap());
+ }
+ checkConfiguredWALEntryFilters(peerConfig);
+ }
+
/**
* Set a namespace in the peer config means that all tables in this namespace
* will be replicated to the peer cluster.
@@ -150,8 +161,6 @@ public class ReplicationManager {
"Table-cfs config conflict with namespaces config in peer");
}
}
-
-
}
private void checkConfiguredWALEntryFilters(ReplicationPeerConfig peerConfig)
http://git-wip-us.apache.org/repos/asf/hbase/blob/3e2941a4/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/NamespaceTableCfWALEntryFilter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/NamespaceTableCfWALEntryFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/NamespaceTableCfWALEntryFilter.java
index 5591974..9a4cc6c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/NamespaceTableCfWALEntryFilter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/NamespaceTableCfWALEntryFilter.java
@@ -58,69 +58,74 @@ public class NamespaceTableCfWALEntryFilter implements WALEntryFilter, WALCellFi
public Entry filter(Entry entry) {
TableName tabName = entry.getKey().getTablename();
String namespace = tabName.getNamespaceAsString();
- Set<String> namespaces = this.peer.getNamespaces();
- Map<TableName, List<String>> tableCFs = getTableCfs();
+ ReplicationPeerConfig peerConfig = this.peer.getPeerConfig();
- // If null means user has explicitly not configured any namespaces and table CFs
- // so all the tables data are applicable for replication
- if (namespaces == null && tableCFs == null) {
+ if (peerConfig.replicateAllUserTables()) {
+ // replicate all user tables, so return entry directly
return entry;
- }
+ } else {
+ // Not replicate all user tables, so filter by namespaces and table-cfs config
+ Set<String> namespaces = peerConfig.getNamespaces();
+ Map<TableName, List<String>> tableCFs = peerConfig.getTableCFsMap();
- // First filter by namespaces config
- // If table's namespace in peer config, all the tables data are applicable for replication
- if (namespaces != null && namespaces.contains(namespace)) {
- return entry;
- }
+ if (namespaces == null && tableCFs == null) {
+ return null;
+ }
- // Then filter by table-cfs config
- // return null(prevent replicating) if logKey's table isn't in this peer's
- // replicaable namespace list and table list
- if (tableCFs == null || !tableCFs.containsKey(tabName)) {
- return null;
- }
+ // First filter by namespaces config
+ // If table's namespace in peer config, all the tables data are applicable for replication
+ if (namespaces != null && namespaces.contains(namespace)) {
+ return entry;
+ }
+
+ // Then filter by table-cfs config
+ // return null(prevent replicating) if logKey's table isn't in this peer's
+ // replicaable namespace list and table list
+ if (tableCFs == null || !tableCFs.containsKey(tabName)) {
+ return null;
+ }
- return entry;
+ return entry;
+ }
}
@Override
public Cell filterCell(final Entry entry, Cell cell) {
- final Map<TableName, List<String>> tableCfs = getTableCfs();
- if (tableCfs == null) return cell;
- TableName tabName = entry.getKey().getTablename();
- List<String> cfs = tableCfs.get(tabName);
- // ignore(remove) kv if its cf isn't in the replicable cf list
- // (empty cfs means all cfs of this table are replicable)
- if (CellUtil.matchingColumn(cell, WALEdit.METAFAMILY, WALEdit.BULK_LOAD)) {
- cell = bulkLoadFilter.filterCell(cell, new Predicate<byte[]>() {
- @Override
- public boolean apply(byte[] fam) {
- if (tableCfs != null) {
- List<String> cfs = tableCfs.get(entry.getKey().getTablename());
- if (cfs != null && !cfs.contains(Bytes.toString(fam))) {
- return true;
+ ReplicationPeerConfig peerConfig = this.peer.getPeerConfig();
+ if (peerConfig.replicateAllUserTables()) {
+ // replicate all user tables, so return cell directly
+ return cell;
+ } else {
+ final Map<TableName, List<String>> tableCfs = peerConfig.getTableCFsMap();
+ if (tableCfs == null) {
+ return cell;
+ }
+ TableName tabName = entry.getKey().getTablename();
+ List<String> cfs = tableCfs.get(tabName);
+ // ignore(remove) kv if its cf isn't in the replicable cf list
+ // (empty cfs means all cfs of this table are replicable)
+ if (CellUtil.matchingColumn(cell, WALEdit.METAFAMILY, WALEdit.BULK_LOAD)) {
+ cell = bulkLoadFilter.filterCell(cell, new Predicate<byte[]>() {
+ @Override
+ public boolean apply(byte[] fam) {
+ if (tableCfs != null) {
+ List<String> cfs = tableCfs.get(entry.getKey().getTablename());
+ if (cfs != null && !cfs.contains(Bytes.toString(fam))) {
+ return true;
+ }
}
+ return false;
}
- return false;
+ });
+ } else {
+ if ((cfs != null)
+ && !cfs.contains(Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(),
+ cell.getFamilyLength()))) {
+ return null;
}
- });
- } else {
- if ((cfs != null) && !cfs.contains(
- Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()))) {
- return null;
}
- }
- return cell;
- }
- Map<TableName, List<String>> getTableCfs() {
- Map<TableName, List<String>> tableCFs = null;
- try {
- tableCFs = this.peer.getTableCFs();
- } catch (IllegalArgumentException e) {
- LOG.error("should not happen: can't get tableCFs for peer " + peer.getId() +
- ", degenerate as if it's not configured by keeping tableCFs==null");
+ return cell;
}
- return tableCFs;
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/3e2941a4/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationPeerConfigUpgrader.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationPeerConfigUpgrader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationPeerConfigUpgrader.java
new file mode 100644
index 0000000..5c8fba3
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationPeerConfigUpgrader.java
@@ -0,0 +1,184 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.master;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
+import org.apache.hadoop.hbase.exceptions.DeserializationException;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
+import org.apache.hadoop.hbase.replication.ReplicationStateZKBase;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+import org.apache.zookeeper.KeeperException;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
+
+/**
+ * This class is used to upgrade TableCFs from HBase 1.0, 1.1, 1.2, 1.3 to HBase 1.4 or 2.x.
+ * It will be removed in HBase 3.x. See HBASE-11393
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class ReplicationPeerConfigUpgrader extends ReplicationStateZKBase {
+
+ private static final Log LOG = LogFactory.getLog(ReplicationPeerConfigUpgrader.class);
+
+ public ReplicationPeerConfigUpgrader(ZKWatcher zookeeper,
+ Configuration conf, Abortable abortable) {
+ super(zookeeper, conf, abortable);
+ }
+
+ public void upgrade() throws Exception {
+ try (Connection conn = ConnectionFactory.createConnection(conf)) {
+ Admin admin = conn.getAdmin();
+ admin.listReplicationPeers().forEach(
+ (peerDesc) -> {
+ String peerId = peerDesc.getPeerId();
+ ReplicationPeerConfig peerConfig = peerDesc.getPeerConfig();
+ if ((peerConfig.getNamespaces() != null && !peerConfig.getNamespaces().isEmpty())
+ || (peerConfig.getTableCFsMap() != null && !peerConfig.getTableCFsMap().isEmpty())) {
+ peerConfig.setReplicateAllUserTables(false);
+ try {
+ admin.updateReplicationPeerConfig(peerId, peerConfig);
+ } catch (Exception e) {
+ LOG.error("Failed to upgrade replication peer config for peerId=" + peerId, e);
+ }
+ }
+ });
+ }
+ }
+
+ public void copyTableCFs() {
+ List<String> znodes = null;
+ try {
+ znodes = ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode);
+ } catch (KeeperException e) {
+ LOG.error("Failed to get peers znode", e);
+ }
+ if (znodes != null) {
+ for (String peerId : znodes) {
+ if (!copyTableCFs(peerId)) {
+ LOG.error("upgrade tableCFs failed for peerId=" + peerId);
+ }
+ }
+ }
+ }
+
+ public boolean copyTableCFs(String peerId) {
+ String tableCFsNode = getTableCFsNode(peerId);
+ try {
+ if (ZKUtil.checkExists(zookeeper, tableCFsNode) != -1) {
+ String peerNode = getPeerNode(peerId);
+ ReplicationPeerConfig rpc = getReplicationPeerConig(peerNode);
+ // We only need to copy data from tableCFs node to rpc Node the first time hmaster start.
+ if (rpc.getTableCFsMap() == null || rpc.getTableCFsMap().isEmpty()) {
+ // we copy TableCFs node into PeerNode
+ LOG.info("copy tableCFs into peerNode:" + peerId);
+ ReplicationProtos.TableCF[] tableCFs =
+ ReplicationPeerConfigUtil.parseTableCFs(
+ ZKUtil.getData(this.zookeeper, tableCFsNode));
+ if (tableCFs != null && tableCFs.length > 0) {
+ rpc.setTableCFsMap(ReplicationPeerConfigUtil.convert2Map(tableCFs));
+ ZKUtil.setData(this.zookeeper, peerNode,
+ ReplicationPeerConfigUtil.toByteArray(rpc));
+ }
+ } else {
+ LOG.info("No tableCFs in peerNode:" + peerId);
+ }
+ }
+ } catch (KeeperException e) {
+ LOG.warn("NOTICE!! Update peerId failed, peerId=" + peerId, e);
+ return false;
+ } catch (InterruptedException e) {
+ LOG.warn("NOTICE!! Update peerId failed, peerId=" + peerId, e);
+ return false;
+ } catch (IOException e) {
+ LOG.warn("NOTICE!! Update peerId failed, peerId=" + peerId, e);
+ return false;
+ }
+ return true;
+ }
+
+ private ReplicationPeerConfig getReplicationPeerConig(String peerNode)
+ throws KeeperException, InterruptedException {
+ byte[] data = null;
+ data = ZKUtil.getData(this.zookeeper, peerNode);
+ if (data == null) {
+ LOG.error("Could not get configuration for " +
+ "peer because it doesn't exist. peer=" + peerNode);
+ return null;
+ }
+ try {
+ return ReplicationPeerConfigUtil.parsePeerFrom(data);
+ } catch (DeserializationException e) {
+ LOG.warn("Failed to parse cluster key from peer=" + peerNode);
+ return null;
+ }
+ }
+
+ private static void printUsageAndExit() {
+ System.err.printf(
+ "Usage: hbase org.apache.hadoop.hbase.replication.master.ReplicationPeerConfigUpgrader"
+ + " [options]");
+ System.err.println(" where [options] are:");
+ System.err.println(" -h|-help Show this help and exit.");
+ System.err.println(" copyTableCFs Copy table-cfs to replication peer config");
+ System.err.println(" upgrade Upgrade replication peer config to new format");
+ System.err.println();
+ System.exit(1);
+ }
+
+ public static void main(String[] args) throws Exception {
+ if (args.length != 1) {
+ printUsageAndExit();
+ }
+ if (args[0].equals("-help") || args[0].equals("-h")) {
+ printUsageAndExit();
+ } else if (args[0].equals("copyTableCFs")) {
+ Configuration conf = HBaseConfiguration.create();
+ ZKWatcher zkw = new ZKWatcher(conf, "ReplicationPeerConfigUpgrader", null);
+ try {
+ ReplicationPeerConfigUpgrader tableCFsUpdater = new ReplicationPeerConfigUpgrader(zkw,
+ conf, null);
+ tableCFsUpdater.copyTableCFs();
+ } finally {
+ zkw.close();
+ }
+ } else if (args[0].equals("upgrade")) {
+ Configuration conf = HBaseConfiguration.create();
+ ZKWatcher zkw = new ZKWatcher(conf, "ReplicationPeerConfigUpgrader", null);
+ ReplicationPeerConfigUpgrader upgrader = new ReplicationPeerConfigUpgrader(zkw, conf, null);
+ upgrader.upgrade();
+ } else {
+ printUsageAndExit();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/3e2941a4/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/TableCFsUpdater.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/TableCFsUpdater.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/TableCFsUpdater.java
deleted file mode 100644
index f442495..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/TableCFsUpdater.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication.master;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Abortable;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
-import org.apache.hadoop.hbase.exceptions.DeserializationException;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
-import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
-import org.apache.hadoop.hbase.replication.ReplicationStateZKBase;
-import org.apache.hadoop.hbase.zookeeper.ZKUtil;
-import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.yetus.audience.InterfaceStability;
-import org.apache.zookeeper.KeeperException;
-
-import java.io.IOException;
-import java.util.List;
-
-/**
- * This class is used to upgrade TableCFs from HBase 1.0, 1.1, 1.2, 1.3 to HBase 1.4 or 2.x.
- * It will be removed in HBase 3.x. See HBASE-11393
- */
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-public class TableCFsUpdater extends ReplicationStateZKBase {
-
- private static final Log LOG = LogFactory.getLog(TableCFsUpdater.class);
-
- public TableCFsUpdater(ZKWatcher zookeeper,
- Configuration conf, Abortable abortable) {
- super(zookeeper, conf, abortable);
- }
-
- public void update() {
- List<String> znodes = null;
- try {
- znodes = ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode);
- } catch (KeeperException e) {
- LOG.error("Failed to get peers znode", e);
- }
- if (znodes != null) {
- for (String peerId : znodes) {
- if (!update(peerId)) {
- LOG.error("upgrade tableCFs failed for peerId=" + peerId);
- }
- }
- }
- }
-
- public boolean update(String peerId) {
- String tableCFsNode = getTableCFsNode(peerId);
- try {
- if (ZKUtil.checkExists(zookeeper, tableCFsNode) != -1) {
- String peerNode = getPeerNode(peerId);
- ReplicationPeerConfig rpc = getReplicationPeerConig(peerNode);
- // We only need to copy data from tableCFs node to rpc Node the first time hmaster start.
- if (rpc.getTableCFsMap() == null || rpc.getTableCFsMap().isEmpty()) {
- // we copy TableCFs node into PeerNode
- LOG.info("copy tableCFs into peerNode:" + peerId);
- ReplicationProtos.TableCF[] tableCFs =
- ReplicationPeerConfigUtil.parseTableCFs(
- ZKUtil.getData(this.zookeeper, tableCFsNode));
- if (tableCFs != null && tableCFs.length > 0) {
- rpc.setTableCFsMap(ReplicationPeerConfigUtil.convert2Map(tableCFs));
- ZKUtil.setData(this.zookeeper, peerNode,
- ReplicationPeerConfigUtil.toByteArray(rpc));
- }
- } else {
- LOG.info("No tableCFs in peerNode:" + peerId);
- }
- }
- } catch (KeeperException e) {
- LOG.warn("NOTICE!! Update peerId failed, peerId=" + peerId, e);
- return false;
- } catch (InterruptedException e) {
- LOG.warn("NOTICE!! Update peerId failed, peerId=" + peerId, e);
- return false;
- } catch (IOException e) {
- LOG.warn("NOTICE!! Update peerId failed, peerId=" + peerId, e);
- return false;
- }
- return true;
- }
-
- private ReplicationPeerConfig getReplicationPeerConig(String peerNode)
- throws KeeperException, InterruptedException {
- byte[] data = null;
- data = ZKUtil.getData(this.zookeeper, peerNode);
- if (data == null) {
- LOG.error("Could not get configuration for " +
- "peer because it doesn't exist. peer=" + peerNode);
- return null;
- }
- try {
- return ReplicationPeerConfigUtil.parsePeerFrom(data);
- } catch (DeserializationException e) {
- LOG.warn("Failed to parse cluster key from peer=" + peerNode);
- return null;
- }
- }
-
- private static void printUsageAndExit() {
- System.err.printf("Usage: hbase org.apache.hadoop.hbase.replication.master.TableCFsUpdater [options]");
- System.err.println(" where [options] are:");
- System.err.println(" -h|-help Show this help and exit.");
- System.err.println(" update Copy table-cfs to replication peer config");
- System.err.println();
- System.exit(1);
- }
-
- public static void main(String[] args) throws Exception {
- if (args.length != 1) {
- printUsageAndExit();
- }
- if (args[0].equals("-help") || args[0].equals("-h")) {
- printUsageAndExit();
- } else if (args[0].equals("update")) {
- Configuration conf = HBaseConfiguration.create();
- ZKWatcher zkw = new ZKWatcher(conf, "TableCFsUpdater", null);
- try {
- TableCFsUpdater tableCFsUpdater = new TableCFsUpdater(zkw, conf, null);
- tableCFsUpdater.update();
- } finally {
- zkw.close();
- }
- } else {
- printUsageAndExit();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/3e2941a4/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApi.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApi.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApi.java
index e489078..6591826 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApi.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApi.java
@@ -149,6 +149,8 @@ public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase {
// Add a valid peer
admin.addReplicationPeer(ID_ONE, rpc1).join();
+ rpc1.setReplicateAllUserTables(false);
+ admin.updateReplicationPeerConfig(ID_ONE, rpc1).join();
Map<TableName, List<String>> tableCFs = new HashMap<>();
@@ -248,6 +250,9 @@ public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase {
final TableName tableName4 = TableName.valueOf(tableName.getNameAsString() + "t4");
// Add a valid peer
admin.addReplicationPeer(ID_ONE, rpc1).join();
+ rpc1.setReplicateAllUserTables(false);
+ admin.updateReplicationPeerConfig(ID_ONE, rpc1).join();
+
Map<TableName, List<String>> tableCFs = new HashMap<>();
try {
tableCFs.put(tableName3, null);
@@ -328,6 +333,8 @@ public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase {
ReplicationPeerConfig rpc = new ReplicationPeerConfig();
rpc.setClusterKey(KEY_ONE);
admin.addReplicationPeer(ID_ONE, rpc).join();
+ rpc.setReplicateAllUserTables(false);
+ admin.updateReplicationPeerConfig(ID_ONE, rpc).join();
// add ns1 and ns2 to peer config
rpc = admin.getReplicationPeerConfig(ID_ONE).get();
@@ -364,6 +371,8 @@ public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase {
ReplicationPeerConfig rpc = new ReplicationPeerConfig();
rpc.setClusterKey(KEY_ONE);
admin.addReplicationPeer(ID_ONE, rpc).join();
+ rpc.setReplicateAllUserTables(false);
+ admin.updateReplicationPeerConfig(ID_ONE, rpc).join();
rpc = admin.getReplicationPeerConfig(ID_ONE).get();
Set<String> namespaces = new HashSet<String>();
http://git-wip-us.apache.org/repos/asf/hbase/blob/3e2941a4/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApiWithClusters.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApiWithClusters.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApiWithClusters.java
index 4b88bf7..9ceb172 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApiWithClusters.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApiWithClusters.java
@@ -218,6 +218,7 @@ public class TestAsyncReplicationAdminApiWithClusters extends TestAsyncAdminBase
Map<TableName, ? extends Collection<String>> tableCfs = new HashMap<>();
tableCfs.put(tableName, null);
ReplicationPeerConfig rpc = admin.getReplicationPeerConfig(ID_SECOND).get();
+ rpc.setReplicateAllUserTables(false);
rpc.setTableCFsMap(tableCfs);
try {
// Only add tableName to replication peer config
@@ -236,6 +237,7 @@ public class TestAsyncReplicationAdminApiWithClusters extends TestAsyncAdminBase
admin2.tableExists(tableName2).get());
} finally {
rpc.setTableCFsMap(null);
+ rpc.setReplicateAllUserTables(true);
admin.updateReplicationPeerConfig(ID_SECOND, rpc).join();
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/3e2941a4/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
index 036706a..19f117b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
@@ -85,10 +85,9 @@ public class TestReplicationAdmin {
*/
@BeforeClass
public static void setUpBeforeClass() throws Exception {
+ TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
TEST_UTIL.startMiniCluster();
- Configuration conf = TEST_UTIL.getConfiguration();
- conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
- admin = new ReplicationAdmin(conf);
+ admin = new ReplicationAdmin(TEST_UTIL.getConfiguration());
hbaseAdmin = TEST_UTIL.getAdmin();
}
@@ -238,8 +237,8 @@ public class TestReplicationAdmin {
@Test
public void testAppendPeerTableCFs() throws Exception {
- ReplicationPeerConfig rpc1 = new ReplicationPeerConfig();
- rpc1.setClusterKey(KEY_ONE);
+ ReplicationPeerConfig rpc = new ReplicationPeerConfig();
+ rpc.setClusterKey(KEY_ONE);
final TableName tableName1 = TableName.valueOf(name.getMethodName() + "t1");
final TableName tableName2 = TableName.valueOf(name.getMethodName() + "t2");
final TableName tableName3 = TableName.valueOf(name.getMethodName() + "t3");
@@ -248,10 +247,14 @@ public class TestReplicationAdmin {
final TableName tableName6 = TableName.valueOf(name.getMethodName() + "t6");
// Add a valid peer
- admin.addPeer(ID_ONE, rpc1, null);
+ hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
- Map<TableName, List<String>> tableCFs = new HashMap<>();
+ // Update peer config, not replicate all user tables
+ rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
+ rpc.setReplicateAllUserTables(false);
+ hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
+ Map<TableName, List<String>> tableCFs = new HashMap<>();
tableCFs.put(tableName1, null);
admin.appendPeerTableCFs(ID_ONE, tableCFs);
Map<TableName, List<String>> result =
@@ -338,14 +341,21 @@ public class TestReplicationAdmin {
@Test
public void testRemovePeerTableCFs() throws Exception {
- ReplicationPeerConfig rpc1 = new ReplicationPeerConfig();
- rpc1.setClusterKey(KEY_ONE);
+ ReplicationPeerConfig rpc = new ReplicationPeerConfig();
+ rpc.setClusterKey(KEY_ONE);
final TableName tableName1 = TableName.valueOf(name.getMethodName() + "t1");
final TableName tableName2 = TableName.valueOf(name.getMethodName() + "t2");
final TableName tableName3 = TableName.valueOf(name.getMethodName() + "t3");
final TableName tableName4 = TableName.valueOf(name.getMethodName() + "t4");
+
// Add a valid peer
- admin.addPeer(ID_ONE, rpc1, null);
+ hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
+
+ // Update peer config, not replicate all user tables
+ rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
+ rpc.setReplicateAllUserTables(false);
+ hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
+
Map<TableName, List<String>> tableCFs = new HashMap<>();
try {
tableCFs.put(tableName3, null);
@@ -423,27 +433,98 @@ public class TestReplicationAdmin {
rpc.setClusterKey(KEY_ONE);
hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
- rpc = admin.getPeerConfig(ID_ONE);
+ rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
+ rpc.setReplicateAllUserTables(false);
+ hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
+
+ rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
Set<String> namespaces = new HashSet<>();
namespaces.add(ns1);
namespaces.add(ns2);
rpc.setNamespaces(namespaces);
- admin.updatePeerConfig(ID_ONE, rpc);
- namespaces = admin.getPeerConfig(ID_ONE).getNamespaces();
+ hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
+ namespaces = hbaseAdmin.getReplicationPeerConfig(ID_ONE).getNamespaces();
assertEquals(2, namespaces.size());
assertTrue(namespaces.contains(ns1));
assertTrue(namespaces.contains(ns2));
- rpc = admin.getPeerConfig(ID_ONE);
+ rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
namespaces.clear();
namespaces.add(ns1);
rpc.setNamespaces(namespaces);
- admin.updatePeerConfig(ID_ONE, rpc);
- namespaces = admin.getPeerConfig(ID_ONE).getNamespaces();
+ hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
+ namespaces = hbaseAdmin.getReplicationPeerConfig(ID_ONE).getNamespaces();
assertEquals(1, namespaces.size());
assertTrue(namespaces.contains(ns1));
- admin.removePeer(ID_ONE);
+ hbaseAdmin.removeReplicationPeer(ID_ONE);
+ }
+
+ @Test
+ public void testSetReplicateAllUserTables() throws Exception {
+ ReplicationPeerConfig rpc = new ReplicationPeerConfig();
+ rpc.setClusterKey(KEY_ONE);
+ hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
+
+ rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
+ assertTrue(rpc.replicateAllUserTables());
+
+ rpc.setReplicateAllUserTables(false);
+ hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
+ rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
+ assertFalse(rpc.replicateAllUserTables());
+
+ rpc.setReplicateAllUserTables(true);
+ hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
+ rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
+ assertTrue(rpc.replicateAllUserTables());
+
+ hbaseAdmin.removeReplicationPeer(ID_ONE);
+ }
+
+ @Test
+ public void testPeerConfigConflict() throws Exception {
+ // Default replicate all flag is true
+ ReplicationPeerConfig rpc = new ReplicationPeerConfig();
+ rpc.setClusterKey(KEY_ONE);
+
+ String ns1 = "ns1";
+ Set<String> namespaces = new HashSet<String>();
+ namespaces.add(ns1);
+
+ TableName tab1 = TableName.valueOf("ns1:tabl");
+ Map<TableName, List<String>> tableCfs = new HashMap<TableName, List<String>>();
+ tableCfs.put(tab1, new ArrayList<String>());
+
+ try {
+ rpc.setNamespaces(namespaces);
+ hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
+ fail("Should throw Exception. When replicate all flag is true, no need to config namespaces");
+ } catch (IOException e) {
+ // OK
+ rpc.setNamespaces(null);
+ }
+
+ try {
+ rpc.setTableCFsMap(tableCfs);
+ hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
+ fail("Should throw Exception. When replicate all flag is true, no need to config table-cfs");
+ } catch (IOException e) {
+ // OK
+ rpc.setTableCFsMap(null);
+ }
+
+ try {
+ rpc.setNamespaces(namespaces);
+ rpc.setTableCFsMap(tableCfs);
+ hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
+ fail("Should throw Exception."
+ + " When replicate all flag is true, no need to config namespaces or table-cfs");
+ } catch (IOException e) {
+ // OK
+ rpc.setNamespaces(null);
+ rpc.setTableCFsMap(null);
+ }
}
@Test
@@ -455,6 +536,7 @@ public class TestReplicationAdmin {
ReplicationPeerConfig rpc = new ReplicationPeerConfig();
rpc.setClusterKey(KEY_ONE);
+ rpc.setReplicateAllUserTables(false);
hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
rpc = admin.getPeerConfig(ID_ONE);
http://git-wip-us.apache.org/repos/asf/hbase/blob/3e2941a4/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminWithClusters.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminWithClusters.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminWithClusters.java
index 2610313..3b7fd84 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminWithClusters.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminWithClusters.java
@@ -194,7 +194,13 @@ public class TestReplicationAdminWithClusters extends TestReplicationBase {
admin2.disableTable(TestReplicationBase.tableName);
admin2.deleteTable(TestReplicationBase.tableName);
}
- assertFalse("Table should not exists in the peer cluster", admin2.isTableAvailable(TestReplicationBase.tableName));
+ assertFalse("Table should not exists in the peer cluster",
+ admin2.isTableAvailable(TestReplicationBase.tableName));
+
+ // update peer config
+ ReplicationPeerConfig rpc = admin1.getReplicationPeerConfig(peerId);
+ rpc.setReplicateAllUserTables(false);
+ admin1.updateReplicationPeerConfig(peerId, rpc);
Map<TableName, ? extends Collection<String>> tableCfs = new HashMap<>();
tableCfs.put(tableName, null);
@@ -214,6 +220,10 @@ public class TestReplicationAdminWithClusters extends TestReplicationBase {
} finally {
adminExt.removePeerTableCFs(peerId, adminExt.getPeerTableCFs(peerId));
admin1.disableTableReplication(TestReplicationBase.tableName);
+
+ rpc = admin1.getReplicationPeerConfig(peerId);
+ rpc.setReplicateAllUserTables(true);
+ admin1.updateReplicationPeerConfig(peerId, rpc);
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/3e2941a4/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
index 58b22c8..ac53551 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
@@ -524,10 +524,12 @@ public class TestMasterReplication {
private void addPeer(String id, int masterClusterNumber, int slaveClusterNumber, String tableCfs)
throws Exception {
- try (Admin admin = ConnectionFactory.createConnection(configurations[masterClusterNumber])
- .getAdmin()) {
- admin.addReplicationPeer(id,
+ try (Admin admin =
+ ConnectionFactory.createConnection(configurations[masterClusterNumber]).getAdmin()) {
+ admin.addReplicationPeer(
+ id,
new ReplicationPeerConfig().setClusterKey(utilities[slaveClusterNumber].getClusterKey())
+ .setReplicateAllUserTables(false)
.setTableCFsMap(ReplicationPeerConfigUtil.parseTableCFsFromConfig(tableCfs)));
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/3e2941a4/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestNamespaceReplication.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestNamespaceReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestNamespaceReplication.java
index 433a345..0d7a92d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestNamespaceReplication.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestNamespaceReplication.java
@@ -140,8 +140,12 @@ public class TestNamespaceReplication extends TestReplicationBase {
Table htab1B = connection1.getTable(tabBName);
Table htab2B = connection2.getTable(tabBName);
- // add ns1 to peer config which replicate to cluster2
ReplicationPeerConfig rpc = admin.getPeerConfig("2");
+ rpc.setReplicateAllUserTables(false);
+ admin.updatePeerConfig("2", rpc);
+
+ // add ns1 to peer config which replicate to cluster2
+ rpc = admin.getPeerConfig("2");
Set<String> namespaces = new HashSet<>();
namespaces.add(ns1);
rpc.setNamespaces(namespaces);
http://git-wip-us.apache.org/repos/asf/hbase/blob/3e2941a4/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestPerTableCFReplication.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestPerTableCFReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestPerTableCFReplication.java
index 84ce9a3..e9c352d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestPerTableCFReplication.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestPerTableCFReplication.java
@@ -404,6 +404,7 @@ public class TestPerTableCFReplication {
// A. add cluster2/cluster3 as peers to cluster1
ReplicationPeerConfig rpc2 = new ReplicationPeerConfig();
rpc2.setClusterKey(utility2.getClusterKey());
+ rpc2.setReplicateAllUserTables(false);
Map<TableName, List<String>> tableCFs = new HashMap<>();
tableCFs.put(tabCName, null);
tableCFs.put(tabBName, new ArrayList<>());
@@ -413,6 +414,7 @@ public class TestPerTableCFReplication {
ReplicationPeerConfig rpc3 = new ReplicationPeerConfig();
rpc3.setClusterKey(utility3.getClusterKey());
+ rpc3.setReplicateAllUserTables(false);
tableCFs.clear();
tableCFs.put(tabAName, null);
tableCFs.put(tabBName, new ArrayList<>());
@@ -518,7 +520,7 @@ public class TestPerTableCFReplication {
connection2.close();
connection3.close();
}
- }
+ }
private void ensureRowNotReplicated(byte[] row, byte[] fam, Table... tables) throws IOException {
Get get = new Get(row);
http://git-wip-us.apache.org/repos/asf/hbase/blob/3e2941a4/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java
index be65576..9fda8bc 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java
@@ -202,19 +202,31 @@ public class TestReplicationWALEntryFilters {
@Test
public void testNamespaceTableCfWALEntryFilter() {
ReplicationPeer peer = mock(ReplicationPeer.class);
+ ReplicationPeerConfig peerConfig = mock(ReplicationPeerConfig.class);
- // 1. no namespaces config and table-cfs config in peer
- when(peer.getNamespaces()).thenReturn(null);
- when(peer.getTableCFs()).thenReturn(null);
+ // 1. replicate all user tables
+ when(peerConfig.replicateAllUserTables()).thenReturn(true);
+ when(peer.getPeerConfig()).thenReturn(peerConfig);
Entry userEntry = createEntry(null, a, b, c);
- WALEntryFilter filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
- assertEquals(createEntry(null, a,b,c), filter.filter(userEntry));
+ ChainWALEntryFilter filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
+ assertEquals(createEntry(null, a, b, c), filter.filter(userEntry));
+
+ // 2. not replicate all user tables, no namespaces and table-cfs config
+ when(peerConfig.replicateAllUserTables()).thenReturn(false);
+ when(peerConfig.getNamespaces()).thenReturn(null);
+ when(peerConfig.getTableCFsMap()).thenReturn(null);
+ when(peer.getPeerConfig()).thenReturn(peerConfig);
+ userEntry = createEntry(null, a, b, c);
+ filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
+ assertEquals(null, filter.filter(userEntry));
- // 2. Only config table-cfs in peer
+ // 3. Only config table-cfs in peer
// empty map
userEntry = createEntry(null, a, b, c);
Map<TableName, List<String>> tableCfs = new HashMap<>();
- when(peer.getTableCFs()).thenReturn(tableCfs);
+ when(peerConfig.replicateAllUserTables()).thenReturn(false);
+ when(peerConfig.getTableCFsMap()).thenReturn(tableCfs);
+ when(peer.getPeerConfig()).thenReturn(peerConfig);
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
assertEquals(null, filter.filter(userEntry));
@@ -222,7 +234,9 @@ public class TestReplicationWALEntryFilters {
userEntry = createEntry(null, a, b, c);
tableCfs = new HashMap<>();
tableCfs.put(TableName.valueOf("bar"), null);
- when(peer.getTableCFs()).thenReturn(tableCfs);
+ when(peerConfig.replicateAllUserTables()).thenReturn(false);
+ when(peerConfig.getTableCFsMap()).thenReturn(tableCfs);
+ when(peer.getPeerConfig()).thenReturn(peerConfig);
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
assertEquals(null, filter.filter(userEntry));
@@ -230,7 +244,9 @@ public class TestReplicationWALEntryFilters {
userEntry = createEntry(null, a, b, c);
tableCfs = new HashMap<>();
tableCfs.put(TableName.valueOf("foo"), Lists.newArrayList("a"));
- when(peer.getTableCFs()).thenReturn(tableCfs);
+ when(peerConfig.replicateAllUserTables()).thenReturn(false);
+ when(peerConfig.getTableCFsMap()).thenReturn(tableCfs);
+ when(peer.getPeerConfig()).thenReturn(peerConfig);
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
assertEquals(createEntry(null, a), filter.filter(userEntry));
@@ -238,7 +254,9 @@ public class TestReplicationWALEntryFilters {
userEntry = createEntry(null, a, b, c, d);
tableCfs = new HashMap<>();
tableCfs.put(TableName.valueOf("foo"), Lists.newArrayList("a", "c"));
- when(peer.getTableCFs()).thenReturn(tableCfs);
+ when(peerConfig.replicateAllUserTables()).thenReturn(false);
+ when(peerConfig.getTableCFsMap()).thenReturn(tableCfs);
+ when(peer.getPeerConfig()).thenReturn(peerConfig);
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
assertEquals(createEntry(null, a,c), filter.filter(userEntry));
@@ -246,14 +264,19 @@ public class TestReplicationWALEntryFilters {
when(peer.getTableCFs()).thenReturn(null);
// empty set
Set<String> namespaces = new HashSet<>();
- when(peer.getNamespaces()).thenReturn(namespaces);
+ when(peerConfig.replicateAllUserTables()).thenReturn(false);
+ when(peerConfig.getNamespaces()).thenReturn(namespaces);
+ when(peerConfig.getTableCFsMap()).thenReturn(null);
+ when(peer.getPeerConfig()).thenReturn(peerConfig);
userEntry = createEntry(null, a, b, c);
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
assertEquals(null, filter.filter(userEntry));
// namespace default
namespaces.add("default");
- when(peer.getNamespaces()).thenReturn(namespaces);
+ when(peerConfig.replicateAllUserTables()).thenReturn(false);
+ when(peerConfig.getNamespaces()).thenReturn(namespaces);
+ when(peer.getPeerConfig()).thenReturn(peerConfig);
userEntry = createEntry(null, a, b, c);
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
assertEquals(createEntry(null, a,b,c), filter.filter(userEntry));
@@ -261,7 +284,9 @@ public class TestReplicationWALEntryFilters {
// namespace ns1
namespaces = new HashSet<>();
namespaces.add("ns1");
- when(peer.getNamespaces()).thenReturn(namespaces);
+ when(peerConfig.replicateAllUserTables()).thenReturn(false);
+ when(peerConfig.getNamespaces()).thenReturn(namespaces);
+ when(peer.getPeerConfig()).thenReturn(peerConfig);
userEntry = createEntry(null, a, b, c);
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
assertEquals(null, filter.filter(userEntry));
@@ -271,9 +296,11 @@ public class TestReplicationWALEntryFilters {
namespaces = new HashSet<>();
tableCfs = new HashMap<>();
namespaces.add("ns1");
- when(peer.getNamespaces()).thenReturn(namespaces);
tableCfs.put(TableName.valueOf("foo"), Lists.newArrayList("a", "c"));
- when(peer.getTableCFs()).thenReturn(tableCfs);
+ when(peerConfig.replicateAllUserTables()).thenReturn(false);
+ when(peerConfig.getNamespaces()).thenReturn(namespaces);
+ when(peerConfig.getTableCFsMap()).thenReturn(tableCfs);
+ when(peer.getPeerConfig()).thenReturn(peerConfig);
userEntry = createEntry(null, a, b, c);
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
assertEquals(createEntry(null, a, c), filter.filter(userEntry));
@@ -281,9 +308,11 @@ public class TestReplicationWALEntryFilters {
namespaces = new HashSet<>();
tableCfs = new HashMap<>();
namespaces.add("default");
- when(peer.getNamespaces()).thenReturn(namespaces);
tableCfs.put(TableName.valueOf("ns1:foo"), Lists.newArrayList("a", "c"));
- when(peer.getTableCFs()).thenReturn(tableCfs);
+ when(peerConfig.replicateAllUserTables()).thenReturn(false);
+ when(peerConfig.getNamespaces()).thenReturn(namespaces);
+ when(peerConfig.getTableCFsMap()).thenReturn(tableCfs);
+ when(peer.getPeerConfig()).thenReturn(peerConfig);
userEntry = createEntry(null, a, b, c);
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
assertEquals(createEntry(null, a, b, c), filter.filter(userEntry));
@@ -291,9 +320,11 @@ public class TestReplicationWALEntryFilters {
namespaces = new HashSet<>();
tableCfs = new HashMap<>();
namespaces.add("ns1");
- when(peer.getNamespaces()).thenReturn(namespaces);
tableCfs.put(TableName.valueOf("bar"), null);
- when(peer.getTableCFs()).thenReturn(tableCfs);
+ when(peerConfig.replicateAllUserTables()).thenReturn(false);
+ when(peerConfig.getNamespaces()).thenReturn(namespaces);
+ when(peerConfig.getTableCFsMap()).thenReturn(tableCfs);
+ when(peer.getPeerConfig()).thenReturn(peerConfig);
userEntry = createEntry(null, a, b, c);
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
assertEquals(null, filter.filter(userEntry));
http://git-wip-us.apache.org/repos/asf/hbase/blob/3e2941a4/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestTableCFsUpdater.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestTableCFsUpdater.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestTableCFsUpdater.java
index cb895ca..e78abfb 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestTableCFsUpdater.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestTableCFsUpdater.java
@@ -50,7 +50,7 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
@Category({ReplicationTests.class, SmallTests.class})
-public class TestTableCFsUpdater extends TableCFsUpdater {
+public class TestTableCFsUpdater extends ReplicationPeerConfigUpgrader {
private static final Log LOG = LogFactory.getLog(TestTableCFsUpdater.class);
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
@@ -164,7 +164,7 @@ public class TestTableCFsUpdater extends TableCFsUpdater {
assertNull(actualRpc.getTableCFsMap());
assertNull(actualTableCfs);
- update();
+ copyTableCFs();
peerId = "1";
peerNode = getPeerNode(peerId);
http://git-wip-us.apache.org/repos/asf/hbase/blob/3e2941a4/hbase-shell/src/main/ruby/hbase/replication_admin.rb
----------------------------------------------------------------------
diff --git a/hbase-shell/src/main/ruby/hbase/replication_admin.rb b/hbase-shell/src/main/ruby/hbase/replication_admin.rb
index 3f64356..50c086a 100644
--- a/hbase-shell/src/main/ruby/hbase/replication_admin.rb
+++ b/hbase-shell/src/main/ruby/hbase/replication_admin.rb
@@ -92,6 +92,7 @@ module Hbase
namespaces.each do |n|
ns_set.add(n)
end
+ replication_peer_config.setReplicateAllUserTables(false)
replication_peer_config.set_namespaces(ns_set)
end
@@ -101,6 +102,7 @@ module Hbase
table_cfs.each do |key, val|
map.put(org.apache.hadoop.hbase.TableName.valueOf(key), val)
end
+ replication_peer_config.setReplicateAllUserTables(false)
replication_peer_config.set_table_cfs_map(map)
end
@@ -265,6 +267,13 @@ module Hbase
end
end
+ def set_peer_replicate_all(id, replicate_all)
+ rpc = @replication_admin.getPeerConfig(id)
+ return if rpc.nil?
+ rpc.setReplicateAllUserTables(replicate_all)
+ @replication_admin.updatePeerConfig(id, rpc)
+ end
+
#----------------------------------------------------------------------------------------------
# Enables a table's replication switch
def enable_tablerep(table_name)
http://git-wip-us.apache.org/repos/asf/hbase/blob/3e2941a4/hbase-shell/src/main/ruby/shell.rb
----------------------------------------------------------------------
diff --git a/hbase-shell/src/main/ruby/shell.rb b/hbase-shell/src/main/ruby/shell.rb
index 687af12..60ca229 100644
--- a/hbase-shell/src/main/ruby/shell.rb
+++ b/hbase-shell/src/main/ruby/shell.rb
@@ -377,6 +377,7 @@ Shell.load_command_group(
list_peers
enable_peer
disable_peer
+ set_peer_replicate_all
set_peer_namespaces
append_peer_namespaces
remove_peer_namespaces
http://git-wip-us.apache.org/repos/asf/hbase/blob/3e2941a4/hbase-shell/src/main/ruby/shell/commands/list_peers.rb
----------------------------------------------------------------------
diff --git a/hbase-shell/src/main/ruby/shell/commands/list_peers.rb b/hbase-shell/src/main/ruby/shell/commands/list_peers.rb
index 04453c2..6812df4 100644
--- a/hbase-shell/src/main/ruby/shell/commands/list_peers.rb
+++ b/hbase-shell/src/main/ruby/shell/commands/list_peers.rb
@@ -33,7 +33,7 @@ EOF
peers = replication_admin.list_peers
formatter.header(%w[PEER_ID CLUSTER_KEY ENDPOINT_CLASSNAME
- STATE NAMESPACES TABLE_CFS BANDWIDTH])
+ STATE REPLICATE_ALL NAMESPACES TABLE_CFS BANDWIDTH])
peers.each do |peer|
id = peer.getPeerId
@@ -42,7 +42,8 @@ EOF
namespaces = replication_admin.show_peer_namespaces(config)
tableCFs = replication_admin.show_peer_tableCFs(id)
formatter.row([id, config.getClusterKey,
- config.getReplicationEndpointImpl, state, namespaces, tableCFs,
+ config.getReplicationEndpointImpl, state,
+ config.replicateAllUserTables, namespaces, tableCFs,
config.getBandwidth])
end
http://git-wip-us.apache.org/repos/asf/hbase/blob/3e2941a4/hbase-shell/src/main/ruby/shell/commands/set_peer_replicate_all.rb
----------------------------------------------------------------------
diff --git a/hbase-shell/src/main/ruby/shell/commands/set_peer_replicate_all.rb b/hbase-shell/src/main/ruby/shell/commands/set_peer_replicate_all.rb
new file mode 100644
index 0000000..f6de615
--- /dev/null
+++ b/hbase-shell/src/main/ruby/shell/commands/set_peer_replicate_all.rb
@@ -0,0 +1,54 @@
+#
+# Copyright The Apache Software Foundation
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+module Shell
+ module Commands
+ class SetPeerReplicateAll < Command
+ def help
+ <<-EOF
+ Set the replicate_all flag to true or false for the specified peer.
+
+ If replicate_all flag is true, then all user tables (REPLICATION_SCOPE != 0)
+ will be replicate to peer cluster.
+
+ If replicate_all flag is false, then all user tables cannot be replicate to
+ peer cluster. Then you can use 'set_peer_namespaces' or 'append_peer_namespaces'
+ to set which namespaces will be replicated to peer cluster. And you can use
+ 'set_peer_tableCFs' or 'append_peer_tableCFs' to set which tables will be
+ replicated to peer cluster.
+
+ Notice: When you want to change a peer's replicate_all flag from false to true,
+ you need clean the peer's NAMESPACES and TABLECFS config firstly.
+
+ Examples:
+
+ # set replicate_all flag to true
+ hbase> set_peer_replicate_all '1', true
+ # set replicate_all flag to false
+ hbase> set_peer_replicate_all '1', false
+EOF
+ end
+
+ def command(id, replicate_all)
+ replication_admin.set_peer_replicate_all(id, replicate_all)
+ end
+ end
+ end
+end
http://git-wip-us.apache.org/repos/asf/hbase/blob/3e2941a4/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb
----------------------------------------------------------------------
diff --git a/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb b/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb
index 75f3c04..4b74ada 100644
--- a/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb
+++ b/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb
@@ -73,8 +73,10 @@ module Hbase
command(:add_peer, @peer_id, {CLUSTER_KEY => cluster_key})
assert_equal(1, command(:list_peers).length)
- assert_equal(@peer_id, command(:list_peers).get(0).getPeerId)
- assert_equal(cluster_key, command(:list_peers).get(0).getPeerConfig.getClusterKey)
+ peer = command(:list_peers).get(0)
+ assert_equal(@peer_id, peer.getPeerId)
+ assert_equal(cluster_key, peer.getPeerConfig.getClusterKey)
+ assert_equal(true, peer.getPeerConfig.replicateAllUserTables)
# cleanup for future tests
command(:remove_peer, @peer_id)
@@ -86,8 +88,10 @@ module Hbase
command(:add_peer, @peer_id, {CLUSTER_KEY => cluster_key})
assert_equal(1, command(:list_peers).length)
- assert_equal(@peer_id, command(:list_peers).get(0).getPeerId)
- assert_equal(cluster_key, command(:list_peers).get(0).getPeerConfig.getClusterKey)
+ peer = command(:list_peers).get(0)
+ assert_equal(@peer_id, peer.getPeerId)
+ assert_equal(cluster_key, peer.getPeerConfig.getClusterKey)
+ assert_equal(true, peer.getPeerConfig.replicateAllUserTables)
# cleanup for future tests
command(:remove_peer, @peer_id)
@@ -131,8 +135,10 @@ module Hbase
command(:add_peer, @peer_id, args)
assert_equal(1, command(:list_peers).length)
- assert_equal(@peer_id, command(:list_peers).get(0).getPeerId)
- assert_equal(cluster_key, command(:list_peers).get(0).getPeerConfig.getClusterKey)
+ peer = command(:list_peers).get(0)
+ assert_equal(@peer_id, peer.getPeerId)
+ assert_equal(cluster_key, peer.getPeerConfig.getClusterKey)
+ assert_equal(true, peer.getPeerConfig.replicateAllUserTables)
# cleanup for future tests
command(:remove_peer, @peer_id)
@@ -147,11 +153,13 @@ module Hbase
command(:add_peer, @peer_id, args)
assert_equal(1, command(:list_peers).length)
- assert_equal(@peer_id, command(:list_peers).get(0).getPeerId)
- peer_config = command(:list_peers).get(0).getPeerConfig
+ peer = command(:list_peers).get(0)
+ assert_equal(@peer_id, peer.getPeerId)
+ peer_config = peer.getPeerConfig
+ assert_equal(false, peer_config.replicateAllUserTables)
assert_equal(cluster_key, peer_config.get_cluster_key)
assert_equal(namespaces_str,
- replication_admin.show_peer_namespaces(peer_config))
+ replication_admin.show_peer_namespaces(peer_config))
# cleanup for future tests
command(:remove_peer, @peer_id)
@@ -169,8 +177,10 @@ module Hbase
command(:add_peer, @peer_id, args)
assert_equal(1, command(:list_peers).length)
- assert_equal(@peer_id, command(:list_peers).get(0).getPeerId)
- peer_config = command(:list_peers).get(0).getPeerConfig
+ peer = command(:list_peers).get(0)
+ assert_equal(@peer_id, peer.getPeerId)
+ peer_config = peer.getPeerConfig
+ assert_equal(false, peer_config.replicateAllUserTables)
assert_equal(cluster_key, peer_config.get_cluster_key)
assert_equal(namespaces_str,
replication_admin.show_peer_namespaces(peer_config))
@@ -203,9 +213,11 @@ module Hbase
command(:add_peer, @peer_id, args)
assert_equal(1, command(:list_peers).length)
- assert_equal(@peer_id, command(:list_peers).get(0).getPeerId)
- assert_equal(cluster_key, command(:list_peers).get(0).getPeerConfig.getClusterKey)
- assert_tablecfs_equal(table_cfs, command(:get_peer_config, @peer_id).getTableCFsMap())
+ peer = command(:list_peers).get(0)
+ assert_equal(@peer_id, peer.getPeerId)
+ assert_equal(cluster_key, peer.getPeerConfig.getClusterKey)
+ assert_tablecfs_equal(table_cfs, peer.getPeerConfig.getTableCFsMap)
+ assert_equal(false, peer.getPeerConfig.replicateAllUserTables)
# cleanup for future tests
command(:remove_peer, @peer_id)
@@ -225,10 +237,12 @@ module Hbase
cluster_key = "zk4,zk5,zk6:11000:/hbase-test"
args = { CLUSTER_KEY => cluster_key}
command(:add_peer, @peer_id, args)
+ command(:set_peer_replicate_all, @peer_id, false)
assert_equal(1, command(:list_peers).length)
- assert_equal(@peer_id, command(:list_peers).get(0).getPeerId)
- assert_equal(cluster_key, command(:list_peers).get(0).getPeerConfig.getClusterKey)
+ peer = command(:list_peers).get(0)
+ assert_equal(@peer_id, peer.getPeerId)
+ assert_equal(cluster_key, peer.getPeerConfig.getClusterKey)
table_cfs = { "table1" => [], "table2" => ["cf1"], "ns3:table3" => ["cf1", "cf2"] }
command(:set_peer_tableCFs, @peer_id, table_cfs)
@@ -242,10 +256,12 @@ module Hbase
cluster_key = "zk4,zk5,zk6:11000:/hbase-test"
args = { CLUSTER_KEY => cluster_key}
command(:add_peer, @peer_id, args)
+ command(:set_peer_replicate_all, @peer_id, false)
assert_equal(1, command(:list_peers).length)
- assert_equal(@peer_id, command(:list_peers).get(0).getPeerId)
- assert_equal(cluster_key, command(:list_peers).get(0).getPeerConfig.getClusterKey)
+ peer = command(:list_peers).get(0)
+ assert_equal(@peer_id, peer.getPeerId)
+ assert_equal(cluster_key, peer.getPeerConfig.getClusterKey)
table_cfs = { "table1" => [], "ns2:table2" => ["cf1"] }
command(:append_peer_tableCFs, @peer_id, table_cfs)
@@ -266,8 +282,9 @@ module Hbase
command(:add_peer, @peer_id, args)
assert_equal(1, command(:list_peers).length)
- assert_equal(@peer_id, command(:list_peers).get(0).getPeerId)
- assert_equal(cluster_key, command(:list_peers).get(0).getPeerConfig.getClusterKey)
+ peer = command(:list_peers).get(0)
+ assert_equal(@peer_id, peer.getPeerId)
+ assert_equal(cluster_key, peer.getPeerConfig.getClusterKey)
table_cfs = { "table1" => [], "ns2:table2" => ["cf1"] }
command(:remove_peer_tableCFs, @peer_id, { "ns3:table3" => ["cf1", "cf2"] })
@@ -284,6 +301,7 @@ module Hbase
args = { CLUSTER_KEY => cluster_key }
command(:add_peer, @peer_id, args)
+ command(:set_peer_replicate_all, @peer_id, false)
command(:set_peer_namespaces, @peer_id, namespaces)
@@ -291,7 +309,7 @@ module Hbase
assert_equal(@peer_id, command(:list_peers).get(0).getPeerId)
peer_config = command(:list_peers).get(0).getPeerConfig
assert_equal(namespaces_str,
- replication_admin.show_peer_namespaces(peer_config))
+ replication_admin.show_peer_namespaces(peer_config))
# cleanup for future tests
command(:remove_peer, @peer_id)
@@ -304,6 +322,7 @@ module Hbase
args = { CLUSTER_KEY => cluster_key }
command(:add_peer, @peer_id, args)
+ command(:set_peer_replicate_all, @peer_id, false)
command(:append_peer_namespaces, @peer_id, namespaces)
@@ -311,7 +330,7 @@ module Hbase
assert_equal(@peer_id, command(:list_peers).get(0).getPeerId)
peer_config = command(:list_peers).get(0).getPeerConfig
assert_equal(namespaces_str,
- replication_admin.show_peer_namespaces(peer_config))
+ replication_admin.show_peer_namespaces(peer_config))
namespaces = ["ns3"]
namespaces_str = "ns1;ns2;ns3"
@@ -321,7 +340,7 @@ module Hbase
assert_equal(@peer_id, command(:list_peers).get(0).getPeerId)
peer_config = command(:list_peers).get(0).getPeerConfig
assert_equal(namespaces_str,
- replication_admin.show_peer_namespaces(peer_config))
+ replication_admin.show_peer_namespaces(peer_config))
# append a namespace which is already in the peer config
command(:append_peer_namespaces, @peer_id, namespaces)
@@ -330,7 +349,7 @@ module Hbase
assert_equal(@peer_id, command(:list_peers).get(0).getPeerId)
peer_config = command(:list_peers).get(0).getPeerConfig
assert_equal(namespaces_str,
- replication_admin.show_peer_namespaces(peer_config))
+ replication_admin.show_peer_namespaces(peer_config))
# cleanup for future tests
command(:remove_peer, @peer_id)
@@ -351,7 +370,7 @@ module Hbase
assert_equal(@peer_id, command(:list_peers).get(0).getPeerId)
peer_config = command(:list_peers).get(0).getPeerConfig
assert_equal(namespaces_str,
- replication_admin.show_peer_namespaces(peer_config))
+ replication_admin.show_peer_namespaces(peer_config))
namespaces = ["ns3"]
namespaces_str = nil
@@ -361,7 +380,7 @@ module Hbase
assert_equal(@peer_id, command(:list_peers).get(0).getPeerId)
peer_config = command(:list_peers).get(0).getPeerConfig
assert_equal(namespaces_str,
- replication_admin.show_peer_namespaces(peer_config))
+ replication_admin.show_peer_namespaces(peer_config))
# remove a namespace which is not in peer config
command(:remove_peer_namespaces, @peer_id, namespaces)
@@ -370,12 +389,34 @@ module Hbase
assert_equal(@peer_id, command(:list_peers).get(0).getPeerId)
peer_config = command(:list_peers).get(0).getPeerConfig
assert_equal(namespaces_str,
- replication_admin.show_peer_namespaces(peer_config))
+ replication_admin.show_peer_namespaces(peer_config))
# cleanup for future tests
command(:remove_peer, @peer_id)
end
+ define_test 'set_peer_replicate_all' do
+ cluster_key = 'zk4,zk5,zk6:11000:/hbase-test'
+
+ args = { CLUSTER_KEY => cluster_key }
+ command(:add_peer, @peer_id, args)
+
+ assert_equal(1, command(:list_peers).length)
+ peer_config = command(:list_peers).get(0).getPeerConfig
+ assert_equal(true, peer_config.replicateAllUserTables)
+
+ command(:set_peer_replicate_all, @peer_id, false)
+ peer_config = command(:list_peers).get(0).getPeerConfig
+ assert_equal(false, peer_config.replicateAllUserTables)
+
+ command(:set_peer_replicate_all, @peer_id, true)
+ peer_config = command(:list_peers).get(0).getPeerConfig
+ assert_equal(true, peer_config.replicateAllUserTables)
+
+ # cleanup for future tests
+ replication_admin.remove_peer(@peer_id)
+ end
+
define_test "set_peer_bandwidth: works with peer bandwidth upper limit" do
cluster_key = "localhost:2181:/hbase-test"
args = { CLUSTER_KEY => cluster_key }