You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by bu...@apache.org on 2016/09/22 16:33:59 UTC
[11/50] [abbrv] hbase git commit: HBASE-16447 Replication by
namespaces config in peer (Guanghao Zhang)
HBASE-16447 Replication by namespaces config in peer (Guanghao Zhang)
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/1a1003a4
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/1a1003a4
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/1a1003a4
Branch: refs/heads/hbase-14439
Commit: 1a1003a482d9bfb725fbe1097c794fdb043dcd81
Parents: 2cf8907
Author: Enis Soztutar <en...@apache.org>
Authored: Fri Sep 16 11:47:42 2016 -0700
Committer: Enis Soztutar <en...@apache.org>
Committed: Fri Sep 16 11:47:42 2016 -0700
----------------------------------------------------------------------
.../client/replication/ReplicationAdmin.java | 38 ++-
.../replication/ReplicationSerDeHelper.java | 30 ++-
.../hbase/replication/ReplicationPeer.java | 7 +
.../replication/ReplicationPeerConfig.java | 15 +-
.../replication/ReplicationPeerZKImpl.java | 10 +
.../replication/ReplicationPeersZKImpl.java | 4 +-
.../hbase/zookeeper/ZooKeeperWatcher.java | 2 +-
.../ipc/protobuf/generated/TestProtos.java | 10 +-
.../protobuf/generated/ZooKeeperProtos.java | 186 ++++++++++++--
.../src/main/protobuf/ZooKeeper.proto | 1 +
.../apache/hadoop/hbase/ZKNamespaceManager.java | 2 +-
.../replication/BaseReplicationEndpoint.java | 6 +-
.../NamespaceTableCfWALEntryFilter.java | 126 ++++++++++
.../replication/TableCfWALEntryFilter.java | 101 --------
.../replication/TestReplicationAdmin.java | 84 +++++++
.../replication/TestNamespaceReplication.java | 248 +++++++++++++++++++
.../TestReplicationWALEntryFilters.java | 73 +++++-
...egionReplicaReplicationEndpointNoMaster.java | 3 +
.../src/main/ruby/hbase/replication_admin.rb | 36 +++
hbase-shell/src/main/ruby/hbase_constants.rb | 1 +
hbase-shell/src/main/ruby/shell.rb | 1 +
.../src/main/ruby/shell/commands/add_peer.rb | 16 +-
.../src/main/ruby/shell/commands/list_peers.rb | 7 +-
.../ruby/shell/commands/set_peer_namespaces.rb | 51 ++++
.../ruby/shell/commands/set_peer_tableCFs.rb | 10 +-
.../test/ruby/hbase/replication_admin_test.rb | 69 +++++-
26 files changed, 993 insertions(+), 144 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/1a1003a4/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
index de6cb7f..dc1a7ad 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
@@ -189,6 +189,8 @@ public class ReplicationAdmin implements Closeable {
* @param peerConfig configuration for the replication slave cluster
*/
public void addPeer(String id, ReplicationPeerConfig peerConfig) throws ReplicationException {
+ checkNamespacesAndTableCfsConfigConflict(peerConfig.getNamespaces(),
+ peerConfig.getTableCFsMap());
this.replicationPeers.registerPeer(id, peerConfig);
}
@@ -202,8 +204,11 @@ public class ReplicationAdmin implements Closeable {
public void updatePeerConfig(String id, ReplicationPeerConfig peerConfig)
throws ReplicationException {
+ checkNamespacesAndTableCfsConfigConflict(peerConfig.getNamespaces(),
+ peerConfig.getTableCFsMap());
this.replicationPeers.updatePeerConfig(id, peerConfig);
}
+
/**
* Removes a peer cluster and stops the replication to it.
* @param id a short name that identifies the cluster
@@ -360,7 +365,6 @@ public class ReplicationAdmin implements Closeable {
}
} else {
throw new ReplicationException("No table: " + table + " in table-cfs config of peer: " + id);
-
}
}
setPeerTableCFs(id, preTableCfs);
@@ -376,6 +380,8 @@ public class ReplicationAdmin implements Closeable {
*/
public void setPeerTableCFs(String id, Map<TableName, ? extends Collection<String>> tableCfs)
throws ReplicationException {
+ checkNamespacesAndTableCfsConfigConflict(
+ this.replicationPeers.getReplicationPeerConfig(id).getNamespaces(), tableCfs);
this.replicationPeers.setPeerTableCFsConfig(id, tableCfs);
}
@@ -627,4 +633,34 @@ public class ReplicationAdmin implements Closeable {
}
return true;
}
+
+ /**
+ * Set a namespace in the peer config means that all tables in this namespace
+ * will be replicated to the peer cluster.
+ *
+ * 1. If you already have set a namespace in the peer config, then you can't set any table
+ * of this namespace to the peer config.
+ * 2. If you already have set a table in the peer config, then you can't set this table's
+ * namespace to the peer config.
+ *
+ * @param namespaces
+ * @param tableCfs
+ * @throws ReplicationException
+ */
+ private void checkNamespacesAndTableCfsConfigConflict(Set<String> namespaces,
+ Map<TableName, ? extends Collection<String>> tableCfs) throws ReplicationException {
+ if (namespaces == null || namespaces.isEmpty()) {
+ return;
+ }
+ if (tableCfs == null || tableCfs.isEmpty()) {
+ return;
+ }
+ for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) {
+ TableName table = entry.getKey();
+ if (namespaces.contains(table.getNamespaceAsString())) {
+ throw new ReplicationException(
+ "Table-cfs config conflict with namespaces config in peer");
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/1a1003a4/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationSerDeHelper.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationSerDeHelper.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationSerDeHelper.java
index 9682f89..225e685 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationSerDeHelper.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationSerDeHelper.java
@@ -19,6 +19,8 @@
package org.apache.hadoop.hbase.client.replication;
import com.google.protobuf.ByteString;
+
+import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.TableName;
@@ -34,10 +36,12 @@ import org.apache.hadoop.hbase.util.Strings;
import java.io.IOException;
import java.util.Collection;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.HashMap;
import java.util.ArrayList;
+import java.util.Set;
/**
* Helper for TableCFs Operations.
@@ -50,6 +54,13 @@ public final class ReplicationSerDeHelper {
private ReplicationSerDeHelper() {}
+ public static String convertToString(Set<String> namespaces) {
+ if (namespaces == null) {
+ return null;
+ }
+ return StringUtils.join(namespaces, ';');
+ }
+
/** convert map to TableCFs Object */
public static ZooKeeperProtos.TableCF[] convert(
Map<TableName, ? extends Collection<String>> tableCfs) {
@@ -262,11 +273,21 @@ public final class ReplicationSerDeHelper {
for (HBaseProtos.NameStringPair pair : peer.getConfigurationList()) {
peerConfig.getConfiguration().put(pair.getName(), pair.getValue());
}
+
Map<TableName, ? extends Collection<String>> tableCFsMap = convert2Map(
peer.getTableCfsList().toArray(new ZooKeeperProtos.TableCF[peer.getTableCfsCount()]));
if (tableCFsMap != null) {
peerConfig.setTableCFsMap(tableCFsMap);
}
+
+ List<ByteString> namespacesList = peer.getNamespacesList();
+ if (namespacesList != null && namespacesList.size() != 0) {
+ Set<String> namespaces = new HashSet<String>();
+ for (ByteString namespace : namespacesList) {
+ namespaces.add(namespace.toStringUtf8());
+ }
+ peerConfig.setNamespaces(namespaces);
+ }
return peerConfig;
}
@@ -292,12 +313,20 @@ public final class ReplicationSerDeHelper {
.setValue(entry.getValue())
.build());
}
+
ZooKeeperProtos.TableCF[] tableCFs = convert(peerConfig.getTableCFsMap());
if (tableCFs != null) {
for (int i = 0; i < tableCFs.length; i++) {
builder.addTableCfs(tableCFs[i]);
}
}
+ Set<String> namespaces = peerConfig.getNamespaces();
+ if (namespaces != null) {
+ for (String namespace : namespaces) {
+ builder.addNamespaces(ByteString.copyFromUtf8(namespace));
+ }
+ }
+
return builder.build();
}
@@ -311,5 +340,4 @@ public final class ReplicationSerDeHelper {
byte[] bytes = convert(peerConfig).toByteArray();
return ProtobufUtil.prependPBMagic(bytes);
}
-
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/1a1003a4/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java
index 3da01fe..bd2b700 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.replication;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
@@ -71,6 +72,12 @@ public interface ReplicationPeer {
*/
public Map<TableName, List<String>> getTableCFs();
+ /**
+ * Get replicable namespace set of this peer
+ * @return the replicable namespaces set
+ */
+ public Set<String> getNamespaces();
+
void trackPeerConfigChanges(ReplicationPeerConfigListener listener);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/1a1003a4/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
index 1d2066c..1f0d085 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
@@ -22,6 +22,7 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.TreeMap;
import org.apache.hadoop.hbase.TableName;
@@ -42,7 +43,7 @@ public class ReplicationPeerConfig {
private final Map<byte[], byte[]> peerData;
private final Map<String, String> configuration;
private Map<TableName, ? extends Collection<String>> tableCFsMap = null;
-
+ private Set<String> namespaces = null;
public ReplicationPeerConfig() {
this.peerData = new TreeMap<byte[], byte[]>(Bytes.BYTES_COMPARATOR);
@@ -93,10 +94,22 @@ public class ReplicationPeerConfig {
return this;
}
+ public Set<String> getNamespaces() {
+ return this.namespaces;
+ }
+
+ public ReplicationPeerConfig setNamespaces(Set<String> namespaces) {
+ this.namespaces = namespaces;
+ return this;
+ }
+
@Override
public String toString() {
StringBuilder builder = new StringBuilder("clusterKey=").append(clusterKey).append(",");
builder.append("replicationEndpointImpl=").append(replicationEndpointImpl).append(",");
+ if (namespaces != null) {
+ builder.append("namespaces=").append(namespaces.toString()).append(",");
+ }
if (tableCFsMap != null) {
builder.append("tableCFs=").append(tableCFsMap.toString());
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/1a1003a4/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java
index a33690c..cfe543a 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java
@@ -23,6 +23,7 @@ import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -163,6 +164,15 @@ public class ReplicationPeerZKImpl extends ReplicationStateZKBase
return this.tableCFs;
}
+ /**
+ * Get replicable namespace set of this peer
+ * @return the replicable namespaces set
+ */
+ @Override
+ public Set<String> getNamespaces() {
+ return this.peerConfig.getNamespaces();
+ }
+
@Override
public void trackPeerConfigChanges(ReplicationPeerConfigListener listener) {
if (this.peerConfigTracker != null){
http://git-wip-us.apache.org/repos/asf/hbase/blob/1a1003a4/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
index 54c2dac..90b1347 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
@@ -343,7 +343,7 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
throws ReplicationException {
ReplicationPeer peer = getConnectedPeer(id);
if (peer == null){
- throw new ReplicationException("Could not find peer Id " + id);
+ throw new ReplicationException("Could not find peer Id " + id + " in connected peers");
}
ReplicationPeerConfig existingConfig = peer.getPeerConfig();
if (newConfig.getClusterKey() != null && !newConfig.getClusterKey().isEmpty() &&
@@ -366,6 +366,8 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
// or data that weren't explicitly changed
existingConfig.getConfiguration().putAll(newConfig.getConfiguration());
existingConfig.getPeerData().putAll(newConfig.getPeerData());
+ existingConfig.setTableCFsMap(newConfig.getTableCFsMap());
+ existingConfig.setNamespaces(newConfig.getNamespaces());
try {
ZKUtil.setData(this.zookeeper, getPeerNode(id),
http://git-wip-us.apache.org/repos/asf/hbase/blob/1a1003a4/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java
index 5ef7171..f7d7e26 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java
@@ -122,7 +122,7 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable {
// znode containing the state of recovering regions
public String recoveringRegionsZNode;
// znode containing namespace descriptors
- public static String namespaceZNode = "namespace";
+ public String namespaceZNode = "namespace";
// znode of indicating master maintenance mode
public static String masterMaintZNode = "masterMaintenance";
http://git-wip-us.apache.org/repos/asf/hbase/blob/1a1003a4/hbase-protocol/src/main/java/org/apache/hadoop/hbase/ipc/protobuf/generated/TestProtos.java
----------------------------------------------------------------------
diff --git a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/ipc/protobuf/generated/TestProtos.java b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/ipc/protobuf/generated/TestProtos.java
index d28945c..58e248e 100644
--- a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/ipc/protobuf/generated/TestProtos.java
+++ b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/ipc/protobuf/generated/TestProtos.java
@@ -2091,7 +2091,7 @@ public final class TestProtos {
public final boolean isInitialized() {
if (!hasMs()) {
-
+
return false;
}
return true;
@@ -2291,7 +2291,7 @@ public final class TestProtos {
if (ref instanceof java.lang.String) {
return (java.lang.String) ref;
} else {
- com.google.protobuf.ByteString bs =
+ com.google.protobuf.ByteString bs =
(com.google.protobuf.ByteString) ref;
java.lang.String s = bs.toStringUtf8();
if (bs.isValidUtf8()) {
@@ -2307,7 +2307,7 @@ public final class TestProtos {
getAddrBytes() {
java.lang.Object ref = addr_;
if (ref instanceof java.lang.String) {
- com.google.protobuf.ByteString b =
+ com.google.protobuf.ByteString b =
com.google.protobuf.ByteString.copyFromUtf8(
(java.lang.String) ref);
addr_ = b;
@@ -2567,7 +2567,7 @@ public final class TestProtos {
public final boolean isInitialized() {
if (!hasAddr()) {
-
+
return false;
}
return true;
@@ -2621,7 +2621,7 @@ public final class TestProtos {
getAddrBytes() {
java.lang.Object ref = addr_;
if (ref instanceof String) {
- com.google.protobuf.ByteString b =
+ com.google.protobuf.ByteString b =
com.google.protobuf.ByteString.copyFromUtf8(
(java.lang.String) ref);
addr_ = b;
http://git-wip-us.apache.org/repos/asf/hbase/blob/1a1003a4/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ZooKeeperProtos.java
----------------------------------------------------------------------
diff --git a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ZooKeeperProtos.java b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ZooKeeperProtos.java
index f64d0c1..d7de638 100644
--- a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ZooKeeperProtos.java
+++ b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ZooKeeperProtos.java
@@ -4782,6 +4782,20 @@ public final class ZooKeeperProtos {
*/
org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableCFOrBuilder getTableCfsOrBuilder(
int index);
+
+ // repeated bytes namespaces = 6;
+ /**
+ * <code>repeated bytes namespaces = 6;</code>
+ */
+ java.util.List<com.google.protobuf.ByteString> getNamespacesList();
+ /**
+ * <code>repeated bytes namespaces = 6;</code>
+ */
+ int getNamespacesCount();
+ /**
+ * <code>repeated bytes namespaces = 6;</code>
+ */
+ com.google.protobuf.ByteString getNamespaces(int index);
}
/**
* Protobuf type {@code hbase.pb.ReplicationPeer}
@@ -4873,6 +4887,14 @@ public final class ZooKeeperProtos {
tableCfs_.add(input.readMessage(org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableCF.PARSER, extensionRegistry));
break;
}
+ case 50: {
+ if (!((mutable_bitField0_ & 0x00000020) == 0x00000020)) {
+ namespaces_ = new java.util.ArrayList<com.google.protobuf.ByteString>();
+ mutable_bitField0_ |= 0x00000020;
+ }
+ namespaces_.add(input.readBytes());
+ break;
+ }
}
}
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -4890,6 +4912,9 @@ public final class ZooKeeperProtos {
if (((mutable_bitField0_ & 0x00000010) == 0x00000010)) {
tableCfs_ = java.util.Collections.unmodifiableList(tableCfs_);
}
+ if (((mutable_bitField0_ & 0x00000020) == 0x00000020)) {
+ namespaces_ = java.util.Collections.unmodifiableList(namespaces_);
+ }
this.unknownFields = unknownFields.build();
makeExtensionsImmutable();
}
@@ -5131,12 +5156,36 @@ public final class ZooKeeperProtos {
return tableCfs_.get(index);
}
+ // repeated bytes namespaces = 6;
+ public static final int NAMESPACES_FIELD_NUMBER = 6;
+ private java.util.List<com.google.protobuf.ByteString> namespaces_;
+ /**
+ * <code>repeated bytes namespaces = 6;</code>
+ */
+ public java.util.List<com.google.protobuf.ByteString>
+ getNamespacesList() {
+ return namespaces_;
+ }
+ /**
+ * <code>repeated bytes namespaces = 6;</code>
+ */
+ public int getNamespacesCount() {
+ return namespaces_.size();
+ }
+ /**
+ * <code>repeated bytes namespaces = 6;</code>
+ */
+ public com.google.protobuf.ByteString getNamespaces(int index) {
+ return namespaces_.get(index);
+ }
+
private void initFields() {
clusterkey_ = "";
replicationEndpointImpl_ = "";
data_ = java.util.Collections.emptyList();
configuration_ = java.util.Collections.emptyList();
tableCfs_ = java.util.Collections.emptyList();
+ namespaces_ = java.util.Collections.emptyList();
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@@ -5187,6 +5236,9 @@ public final class ZooKeeperProtos {
for (int i = 0; i < tableCfs_.size(); i++) {
output.writeMessage(5, tableCfs_.get(i));
}
+ for (int i = 0; i < namespaces_.size(); i++) {
+ output.writeBytes(6, namespaces_.get(i));
+ }
getUnknownFields().writeTo(output);
}
@@ -5216,6 +5268,15 @@ public final class ZooKeeperProtos {
size += com.google.protobuf.CodedOutputStream
.computeMessageSize(5, tableCfs_.get(i));
}
+ {
+ int dataSize = 0;
+ for (int i = 0; i < namespaces_.size(); i++) {
+ dataSize += com.google.protobuf.CodedOutputStream
+ .computeBytesSizeNoTag(namespaces_.get(i));
+ }
+ size += dataSize;
+ size += 1 * getNamespacesList().size();
+ }
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
@@ -5255,6 +5316,8 @@ public final class ZooKeeperProtos {
.equals(other.getConfigurationList());
result = result && getTableCfsList()
.equals(other.getTableCfsList());
+ result = result && getNamespacesList()
+ .equals(other.getNamespacesList());
result = result &&
getUnknownFields().equals(other.getUnknownFields());
return result;
@@ -5288,6 +5351,10 @@ public final class ZooKeeperProtos {
hash = (37 * hash) + TABLE_CFS_FIELD_NUMBER;
hash = (53 * hash) + getTableCfsList().hashCode();
}
+ if (getNamespacesCount() > 0) {
+ hash = (37 * hash) + NAMESPACES_FIELD_NUMBER;
+ hash = (53 * hash) + getNamespacesList().hashCode();
+ }
hash = (29 * hash) + getUnknownFields().hashCode();
memoizedHashCode = hash;
return hash;
@@ -5427,6 +5494,8 @@ public final class ZooKeeperProtos {
} else {
tableCfsBuilder_.clear();
}
+ namespaces_ = java.util.Collections.emptyList();
+ bitField0_ = (bitField0_ & ~0x00000020);
return this;
}
@@ -5490,6 +5559,11 @@ public final class ZooKeeperProtos {
} else {
result.tableCfs_ = tableCfsBuilder_.build();
}
+ if (((bitField0_ & 0x00000020) == 0x00000020)) {
+ namespaces_ = java.util.Collections.unmodifiableList(namespaces_);
+ bitField0_ = (bitField0_ & ~0x00000020);
+ }
+ result.namespaces_ = namespaces_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
@@ -5594,6 +5668,16 @@ public final class ZooKeeperProtos {
}
}
}
+ if (!other.namespaces_.isEmpty()) {
+ if (namespaces_.isEmpty()) {
+ namespaces_ = other.namespaces_;
+ bitField0_ = (bitField0_ & ~0x00000020);
+ } else {
+ ensureNamespacesIsMutable();
+ namespaces_.addAll(other.namespaces_);
+ }
+ onChanged();
+ }
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
@@ -6541,6 +6625,78 @@ public final class ZooKeeperProtos {
return tableCfsBuilder_;
}
+ // repeated bytes namespaces = 6;
+ private java.util.List<com.google.protobuf.ByteString> namespaces_ = java.util.Collections.emptyList();
+ private void ensureNamespacesIsMutable() {
+ if (!((bitField0_ & 0x00000020) == 0x00000020)) {
+ namespaces_ = new java.util.ArrayList<com.google.protobuf.ByteString>(namespaces_);
+ bitField0_ |= 0x00000020;
+ }
+ }
+ /**
+ * <code>repeated bytes namespaces = 6;</code>
+ */
+ public java.util.List<com.google.protobuf.ByteString>
+ getNamespacesList() {
+ return java.util.Collections.unmodifiableList(namespaces_);
+ }
+ /**
+ * <code>repeated bytes namespaces = 6;</code>
+ */
+ public int getNamespacesCount() {
+ return namespaces_.size();
+ }
+ /**
+ * <code>repeated bytes namespaces = 6;</code>
+ */
+ public com.google.protobuf.ByteString getNamespaces(int index) {
+ return namespaces_.get(index);
+ }
+ /**
+ * <code>repeated bytes namespaces = 6;</code>
+ */
+ public Builder setNamespaces(
+ int index, com.google.protobuf.ByteString value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ ensureNamespacesIsMutable();
+ namespaces_.set(index, value);
+ onChanged();
+ return this;
+ }
+ /**
+ * <code>repeated bytes namespaces = 6;</code>
+ */
+ public Builder addNamespaces(com.google.protobuf.ByteString value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ ensureNamespacesIsMutable();
+ namespaces_.add(value);
+ onChanged();
+ return this;
+ }
+ /**
+ * <code>repeated bytes namespaces = 6;</code>
+ */
+ public Builder addAllNamespaces(
+ java.lang.Iterable<? extends com.google.protobuf.ByteString> values) {
+ ensureNamespacesIsMutable();
+ super.addAll(values, namespaces_);
+ onChanged();
+ return this;
+ }
+ /**
+ * <code>repeated bytes namespaces = 6;</code>
+ */
+ public Builder clearNamespaces() {
+ namespaces_ = java.util.Collections.emptyList();
+ bitField0_ = (bitField0_ & ~0x00000020);
+ onChanged();
+ return this;
+ }
+
// @@protoc_insertion_point(builder_scope:hbase.pb.ReplicationPeer)
}
@@ -9822,24 +9978,24 @@ public final class ZooKeeperProtos {
"e:\007ENABLED\"?\n\005State\022\013\n\007ENABLED\020\000\022\014\n\010DISA" +
"BLED\020\001\022\r\n\tDISABLING\020\002\022\014\n\010ENABLING\020\003\"D\n\007T" +
"ableCF\022\'\n\ntable_name\030\001 \001(\0132\023.hbase.pb.Ta",
- "bleName\022\020\n\010families\030\002 \003(\014\"\305\001\n\017Replicatio" +
+ "bleName\022\020\n\010families\030\002 \003(\014\"\331\001\n\017Replicatio" +
"nPeer\022\022\n\nclusterkey\030\001 \002(\t\022\037\n\027replication" +
"EndpointImpl\030\002 \001(\t\022&\n\004data\030\003 \003(\0132\030.hbase" +
".pb.BytesBytesPair\022/\n\rconfiguration\030\004 \003(" +
"\0132\030.hbase.pb.NameStringPair\022$\n\ttable_cfs" +
- "\030\005 \003(\0132\021.hbase.pb.TableCF\"g\n\020Replication" +
- "State\022/\n\005state\030\001 \002(\0162 .hbase.pb.Replicat" +
- "ionState.State\"\"\n\005State\022\013\n\007ENABLED\020\000\022\014\n\010" +
- "DISABLED\020\001\"+\n\027ReplicationHLogPosition\022\020\n" +
- "\010position\030\001 \002(\003\"%\n\017ReplicationLock\022\022\n\nlo",
- "ck_owner\030\001 \002(\t\"\252\001\n\tTableLock\022\'\n\ntable_na" +
- "me\030\001 \001(\0132\023.hbase.pb.TableName\022(\n\nlock_ow" +
- "ner\030\002 \001(\0132\024.hbase.pb.ServerName\022\021\n\tthrea" +
- "d_id\030\003 \001(\003\022\021\n\tis_shared\030\004 \001(\010\022\017\n\007purpose" +
- "\030\005 \001(\t\022\023\n\013create_time\030\006 \001(\003\"\036\n\013SwitchSta" +
- "te\022\017\n\007enabled\030\001 \001(\010BE\n*org.apache.hadoop" +
- ".hbase.protobuf.generatedB\017ZooKeeperProt" +
- "osH\001\210\001\001\240\001\001"
+ "\030\005 \003(\0132\021.hbase.pb.TableCF\022\022\n\nnamespaces\030" +
+ "\006 \003(\014\"g\n\020ReplicationState\022/\n\005state\030\001 \002(\016" +
+ "2 .hbase.pb.ReplicationState.State\"\"\n\005St" +
+ "ate\022\013\n\007ENABLED\020\000\022\014\n\010DISABLED\020\001\"+\n\027Replic" +
+ "ationHLogPosition\022\020\n\010position\030\001 \002(\003\"%\n\017R",
+ "eplicationLock\022\022\n\nlock_owner\030\001 \002(\t\"\252\001\n\tT" +
+ "ableLock\022\'\n\ntable_name\030\001 \001(\0132\023.hbase.pb." +
+ "TableName\022(\n\nlock_owner\030\002 \001(\0132\024.hbase.pb" +
+ ".ServerName\022\021\n\tthread_id\030\003 \001(\003\022\021\n\tis_sha" +
+ "red\030\004 \001(\010\022\017\n\007purpose\030\005 \001(\t\022\023\n\013create_tim" +
+ "e\030\006 \001(\003\"\036\n\013SwitchState\022\017\n\007enabled\030\001 \001(\010B" +
+ "E\n*org.apache.hadoop.hbase.protobuf.gene" +
+ "ratedB\017ZooKeeperProtosH\001\210\001\001\240\001\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -9887,7 +10043,7 @@ public final class ZooKeeperProtos {
internal_static_hbase_pb_ReplicationPeer_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_hbase_pb_ReplicationPeer_descriptor,
- new java.lang.String[] { "Clusterkey", "ReplicationEndpointImpl", "Data", "Configuration", "TableCfs", });
+ new java.lang.String[] { "Clusterkey", "ReplicationEndpointImpl", "Data", "Configuration", "TableCfs", "Namespaces", });
internal_static_hbase_pb_ReplicationState_descriptor =
getDescriptor().getMessageTypes().get(7);
internal_static_hbase_pb_ReplicationState_fieldAccessorTable = new
http://git-wip-us.apache.org/repos/asf/hbase/blob/1a1003a4/hbase-protocol/src/main/protobuf/ZooKeeper.proto
----------------------------------------------------------------------
diff --git a/hbase-protocol/src/main/protobuf/ZooKeeper.proto b/hbase-protocol/src/main/protobuf/ZooKeeper.proto
index 8713cbd..ea8f747 100644
--- a/hbase-protocol/src/main/protobuf/ZooKeeper.proto
+++ b/hbase-protocol/src/main/protobuf/ZooKeeper.proto
@@ -121,6 +121,7 @@ message ReplicationPeer {
repeated BytesBytesPair data = 3;
repeated NameStringPair configuration = 4;
repeated TableCF table_cfs = 5;
+ repeated bytes namespaces = 6;
}
/**
http://git-wip-us.apache.org/repos/asf/hbase/blob/1a1003a4/hbase-server/src/main/java/org/apache/hadoop/hbase/ZKNamespaceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ZKNamespaceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ZKNamespaceManager.java
index ee59c01..7b53333 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ZKNamespaceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ZKNamespaceManager.java
@@ -54,7 +54,7 @@ public class ZKNamespaceManager extends ZooKeeperListener {
public ZKNamespaceManager(ZooKeeperWatcher zkw) throws IOException {
super(zkw);
- nsZNode = ZooKeeperWatcher.namespaceZNode;
+ nsZNode = zkw.namespaceZNode;
cache = new ConcurrentSkipListMap<String, NamespaceDescriptor>();
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/1a1003a4/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java
index d667269..48f3ac5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java
@@ -72,7 +72,7 @@ public abstract class BaseReplicationEndpoint extends AbstractService
if (scopeFilter != null) {
filters.add(scopeFilter);
}
- WALEntryFilter tableCfFilter = getTableCfWALEntryFilter();
+ WALEntryFilter tableCfFilter = getNamespaceTableCfWALEntryFilter();
if (tableCfFilter != null) {
filters.add(tableCfFilter);
}
@@ -87,8 +87,8 @@ public abstract class BaseReplicationEndpoint extends AbstractService
/** Returns a WALEntryFilter for checking replication per table and CF. Subclasses can
* return null if they don't want this filter */
- protected WALEntryFilter getTableCfWALEntryFilter() {
- return new TableCfWALEntryFilter(ctx.getReplicationPeer());
+ protected WALEntryFilter getNamespaceTableCfWALEntryFilter() {
+ return new NamespaceTableCfWALEntryFilter(ctx.getReplicationPeer());
}
@Override
http://git-wip-us.apache.org/repos/asf/hbase/blob/1a1003a4/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/NamespaceTableCfWALEntryFilter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/NamespaceTableCfWALEntryFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/NamespaceTableCfWALEntryFilter.java
new file mode 100644
index 0000000..2673cbd
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/NamespaceTableCfWALEntryFilter.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.replication;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
+
+import com.google.common.base.Predicate;
+
+/**
+ * Filter a WAL Entry by namespaces and table-cfs config in the peer. It first filter entry
+ * by namespaces config, then filter entry by table-cfs config.
+ *
+ * 1. Set a namespace in peer config means that all tables in this namespace will be replicated.
+ * 2. If the namespaces config is null, then the table-cfs config decide which table's edit
+ * can be replicated. If the table-cfs config is null, then the namespaces config decide
+ * which table's edit can be replicated.
+ */
+@InterfaceAudience.Private
+public class NamespaceTableCfWALEntryFilter implements WALEntryFilter, WALCellFilter {
+
+ private static final Log LOG = LogFactory.getLog(NamespaceTableCfWALEntryFilter.class);
+ private final ReplicationPeer peer;
+ private BulkLoadCellFilter bulkLoadFilter = new BulkLoadCellFilter();
+
+ public NamespaceTableCfWALEntryFilter(ReplicationPeer peer) {
+ this.peer = peer;
+ }
+
+ @Override
+ public Entry filter(Entry entry) {
+ TableName tabName = entry.getKey().getTablename();
+ String namespace = tabName.getNamespaceAsString();
+ Set<String> namespaces = this.peer.getNamespaces();
+ Map<TableName, List<String>> tableCFs = getTableCfs();
+
+ // If null means user has explicitly not configured any namespaces and table CFs
+ // so all the tables data are applicable for replication
+ if (namespaces == null && tableCFs == null) {
+ return entry;
+ }
+
+ // First filter by namespaces config
+ // If table's namespace in peer config, all the tables data are applicable for replication
+ if (namespaces != null && namespaces.contains(namespace)) {
+ return entry;
+ }
+
+ // Then filter by table-cfs config
+ // return null(prevent replicating) if logKey's table isn't in this peer's
+ // replicaable namespace list and table list
+ if (tableCFs == null || !tableCFs.containsKey(tabName)) {
+ return null;
+ }
+
+ return entry;
+ }
+
+ @Override
+ public Cell filterCell(final Entry entry, Cell cell) {
+ final Map<TableName, List<String>> tableCfs = getTableCfs();
+ if (tableCfs == null) return cell;
+ TableName tabName = entry.getKey().getTablename();
+ List<String> cfs = tableCfs.get(tabName);
+ // ignore(remove) kv if its cf isn't in the replicable cf list
+ // (empty cfs means all cfs of this table are replicable)
+ if (CellUtil.matchingColumn(cell, WALEdit.METAFAMILY, WALEdit.BULK_LOAD)) {
+ cell = bulkLoadFilter.filterCell(cell, new Predicate<byte[]>() {
+ @Override
+ public boolean apply(byte[] fam) {
+ if (tableCfs != null) {
+ List<String> cfs = tableCfs.get(entry.getKey().getTablename());
+ if (cfs != null && !cfs.contains(Bytes.toString(fam))) {
+ return true;
+ }
+ }
+ return false;
+ }
+ });
+ } else {
+ if ((cfs != null) && !cfs.contains(
+ Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()))) {
+ return null;
+ }
+ }
+ return cell;
+ }
+
+ Map<TableName, List<String>> getTableCfs() {
+ Map<TableName, List<String>> tableCFs = null;
+ try {
+ tableCFs = this.peer.getTableCFs();
+ } catch (IllegalArgumentException e) {
+ LOG.error("should not happen: can't get tableCFs for peer " + peer.getId() +
+ ", degenerate as if it's not configured by keeping tableCFs==null");
+ }
+ return tableCFs;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/1a1003a4/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/TableCfWALEntryFilter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/TableCfWALEntryFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/TableCfWALEntryFilter.java
deleted file mode 100644
index d890e3e..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/TableCfWALEntryFilter.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.replication;
-
-import java.util.List;
-import java.util.Map;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.wal.WAL.Entry;
-
-import com.google.common.base.Predicate;
-
-public class TableCfWALEntryFilter implements WALEntryFilter, WALCellFilter {
-
- private static final Log LOG = LogFactory.getLog(TableCfWALEntryFilter.class);
- private ReplicationPeer peer;
- private BulkLoadCellFilter bulkLoadFilter = new BulkLoadCellFilter();
-
- public TableCfWALEntryFilter(ReplicationPeer peer) {
- this.peer = peer;
- }
-
- @Override
- public Entry filter(Entry entry) {
- TableName tabName = entry.getKey().getTablename();
- Map<TableName, List<String>> tableCFs = getTableCfs();
-
- // If null means user has explicitly not configured any table CFs so all the tables data are
- // applicable for replication
- if (tableCFs == null) return entry;
-
- if (!tableCFs.containsKey(tabName)) {
- return null;
- }
-
- return entry;
- }
-
- @Override
- public Cell filterCell(final Entry entry, Cell cell) {
- final Map<TableName, List<String>> tableCfs = getTableCfs();
- if (tableCfs == null) return cell;
- TableName tabName = entry.getKey().getTablename();
- List<String> cfs = tableCfs.get(tabName);
- // ignore(remove) kv if its cf isn't in the replicable cf list
- // (empty cfs means all cfs of this table are replicable)
- if (CellUtil.matchingColumn(cell, WALEdit.METAFAMILY, WALEdit.BULK_LOAD)) {
- cell = bulkLoadFilter.filterCell(cell, new Predicate<byte[]>() {
- @Override
- public boolean apply(byte[] fam) {
- if (tableCfs != null) {
- List<String> cfs = tableCfs.get(entry.getKey().getTablename());
- if (cfs != null && !cfs.contains(Bytes.toString(fam))) {
- return true;
- }
- }
- return false;
- }
- });
- } else {
- if ((cfs != null) && !cfs.contains(
- Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()))) {
- return null;
- }
- }
- return cell;
- }
-
- Map<TableName, List<String>> getTableCfs() {
- Map<TableName, List<String>> tableCFs = null;
- try {
- tableCFs = this.peer.getTableCFs();
- } catch (IllegalArgumentException e) {
- LOG.error("should not happen: can't get tableCFs for peer " + peer.getId() +
- ", degenerate as if it's not configured by keeping tableCFs==null");
- }
- return tableCFs;
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/1a1003a4/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
index 85820af..c0d18dd 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
@@ -20,8 +20,10 @@ package org.apache.hadoop.hbase.client.replication;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -385,4 +387,86 @@ public class TestReplicationAdmin {
admin.removePeer(ID_ONE);
}
+
+ @Test
+ public void testSetPeerNamespaces() throws Exception {
+ String ns1 = "ns1";
+ String ns2 = "ns2";
+
+ ReplicationPeerConfig rpc = new ReplicationPeerConfig();
+ rpc.setClusterKey(KEY_ONE);
+ admin.addPeer(ID_ONE, rpc);
+ admin.peerAdded(ID_ONE);
+
+ rpc = admin.getPeerConfig(ID_ONE);
+ Set<String> namespaces = new HashSet<String>();
+ namespaces.add(ns1);
+ namespaces.add(ns2);
+ rpc.setNamespaces(namespaces);
+ admin.updatePeerConfig(ID_ONE, rpc);
+ namespaces = admin.getPeerConfig(ID_ONE).getNamespaces();
+ assertEquals(2, namespaces.size());
+ assertTrue(namespaces.contains(ns1));
+ assertTrue(namespaces.contains(ns2));
+
+ rpc = admin.getPeerConfig(ID_ONE);
+ namespaces.clear();
+ namespaces.add(ns1);
+ rpc.setNamespaces(namespaces);
+ admin.updatePeerConfig(ID_ONE, rpc);
+ namespaces = admin.getPeerConfig(ID_ONE).getNamespaces();
+ assertEquals(1, namespaces.size());
+ assertTrue(namespaces.contains(ns1));
+
+ admin.removePeer(ID_ONE);
+ }
+
+ @Test
+ public void testNamespacesAndTableCfsConfigConflict() throws ReplicationException {
+ String ns1 = "ns1";
+ String ns2 = "ns2";
+ TableName tab1 = TableName.valueOf("ns1:tabl");
+ TableName tab2 = TableName.valueOf("ns2:tab2");
+
+ ReplicationPeerConfig rpc = new ReplicationPeerConfig();
+ rpc.setClusterKey(KEY_ONE);
+ admin.addPeer(ID_ONE, rpc);
+ admin.peerAdded(ID_ONE);
+
+ rpc = admin.getPeerConfig(ID_ONE);
+ Set<String> namespaces = new HashSet<String>();
+ namespaces.add(ns1);
+ rpc.setNamespaces(namespaces);
+ admin.updatePeerConfig(ID_ONE, rpc);
+ rpc = admin.getPeerConfig(ID_ONE);
+ Map<TableName, List<String>> tableCfs = new HashMap<>();
+ tableCfs.put(tab1, new ArrayList<String>());
+ rpc.setTableCFsMap(tableCfs);
+ try {
+ admin.updatePeerConfig(ID_ONE, rpc);
+ fail("Should throw ReplicationException, because table " + tab1 + " conflict with namespace "
+ + ns1);
+ } catch (ReplicationException e) {
+ // OK
+ }
+
+ rpc = admin.getPeerConfig(ID_ONE);
+ tableCfs.clear();
+ tableCfs.put(tab2, new ArrayList<String>());
+ rpc.setTableCFsMap(tableCfs);
+ admin.updatePeerConfig(ID_ONE, rpc);
+ rpc = admin.getPeerConfig(ID_ONE);
+ namespaces.clear();
+ namespaces.add(ns2);
+ rpc.setNamespaces(namespaces);
+ try {
+ admin.updatePeerConfig(ID_ONE, rpc);
+ fail("Should throw ReplicationException, because namespace " + ns2 + " conflict with table "
+ + tab2);
+ } catch (ReplicationException e) {
+ // OK
+ }
+
+ admin.removePeer(ID_ONE);
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/1a1003a4/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestNamespaceReplication.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestNamespaceReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestNamespaceReplication.java
new file mode 100644
index 0000000..ee9b0cb
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestNamespaceReplication.java
@@ -0,0 +1,248 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.replication;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.NamespaceDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({MediumTests.class})
+public class TestNamespaceReplication extends TestReplicationBase {
+
+ private static final Log LOG = LogFactory.getLog(TestNamespaceReplication.class);
+
+ private static String ns1 = "ns1";
+ private static String ns2 = "ns2";
+
+ private static final TableName tabAName = TableName.valueOf("ns1:TA");
+ private static final TableName tabBName = TableName.valueOf("ns2:TB");
+
+ private static final byte[] f1Name = Bytes.toBytes("f1");
+ private static final byte[] f2Name = Bytes.toBytes("f2");
+
+ private static final byte[] val = Bytes.toBytes("myval");
+
+ private static HTableDescriptor tabA;
+ private static HTableDescriptor tabB;
+
+ private static Connection connection1;
+ private static Connection connection2;
+ private static Admin admin1;
+ private static Admin admin2;
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ TestReplicationBase.setUpBeforeClass();
+
+ connection1 = ConnectionFactory.createConnection(conf1);
+ connection2 = ConnectionFactory.createConnection(conf2);
+ admin1 = connection1.getAdmin();
+ admin2 = connection2.getAdmin();
+
+ admin1.createNamespace(NamespaceDescriptor.create(ns1).build());
+ admin1.createNamespace(NamespaceDescriptor.create(ns2).build());
+ admin2.createNamespace(NamespaceDescriptor.create(ns1).build());
+ admin2.createNamespace(NamespaceDescriptor.create(ns2).build());
+
+ tabA = new HTableDescriptor(tabAName);
+ HColumnDescriptor fam = new HColumnDescriptor(f1Name);
+ fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
+ tabA.addFamily(fam);
+ fam = new HColumnDescriptor(f2Name);
+ fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
+ tabA.addFamily(fam);
+ admin1.createTable(tabA);
+ admin2.createTable(tabA);
+
+ tabB = new HTableDescriptor(tabBName);
+ fam = new HColumnDescriptor(f1Name);
+ fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
+ tabB.addFamily(fam);
+ fam = new HColumnDescriptor(f2Name);
+ fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
+ tabB.addFamily(fam);
+ admin1.createTable(tabB);
+ admin2.createTable(tabB);
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ admin1.disableTable(tabAName);
+ admin1.deleteTable(tabAName);
+ admin1.disableTable(tabBName);
+ admin1.deleteTable(tabBName);
+ admin2.disableTable(tabAName);
+ admin2.deleteTable(tabAName);
+ admin2.disableTable(tabBName);
+ admin2.deleteTable(tabBName);
+
+ admin1.deleteNamespace(ns1);
+ admin1.deleteNamespace(ns2);
+ admin2.deleteNamespace(ns1);
+ admin2.deleteNamespace(ns2);
+
+ connection1.close();
+ connection2.close();
+ TestReplicationBase.tearDownAfterClass();
+ }
+
+ @Test
+ public void testNamespaceReplication() throws Exception {
+ Table htab1A = connection1.getTable(tabAName);
+ Table htab2A = connection2.getTable(tabAName);
+
+ Table htab1B = connection1.getTable(tabBName);
+ Table htab2B = connection2.getTable(tabBName);
+
+ admin.peerAdded("2");
+ // add ns1 to peer config which replicate to cluster2
+ ReplicationPeerConfig rpc = admin.getPeerConfig("2");
+ Set<String> namespaces = new HashSet<>();
+ namespaces.add(ns1);
+ rpc.setNamespaces(namespaces);
+ admin.updatePeerConfig("2", rpc);
+ LOG.info("update peer config");
+
+ // Table A can be replicated to cluster2
+ put(htab1A, row, f1Name, f2Name);
+ ensureRowExisted(htab2A, row, f1Name, f2Name);
+ delete(htab1A, row, f1Name, f2Name);
+ ensureRowNotExisted(htab2A, row, f1Name, f2Name);
+
+ // Table B can not be replicated to cluster2
+ put(htab1B, row, f1Name, f2Name);
+ ensureRowNotExisted(htab2B, row, f1Name, f2Name);
+
+ // add ns1:TA => 'f1' and ns2 to peer config which replicate to cluster2
+ rpc = admin.getPeerConfig("2");
+ namespaces = new HashSet<>();
+ namespaces.add(ns2);
+ rpc.setNamespaces(namespaces);
+ Map<TableName, List<String>> tableCfs = new HashMap<>();
+ tableCfs.put(tabAName, new ArrayList<String>());
+ tableCfs.get(tabAName).add("f1");
+ rpc.setTableCFsMap(tableCfs);
+ admin.updatePeerConfig("2", rpc);
+ LOG.info("update peer config");
+
+ // Only family f1 of Table A can replicated to cluster2
+ put(htab1A, row, f1Name, f2Name);
+ ensureRowExisted(htab2A, row, f1Name);
+ delete(htab1A, row, f1Name, f2Name);
+ ensureRowNotExisted(htab2A, row, f1Name);
+
+ // All cfs of table B can replicated to cluster2
+ put(htab1B, row, f1Name, f2Name);
+ ensureRowExisted(htab2B, row, f1Name, f2Name);
+ delete(htab1B, row, f1Name, f2Name);
+ ensureRowNotExisted(htab2B, row, f1Name, f2Name);
+
+ admin.removePeer("2");
+ }
+
+ private void put(Table source, byte[] row, byte[]... families)
+ throws Exception {
+ for (byte[] fam : families) {
+ Put put = new Put(row);
+ put.addColumn(fam, row, val);
+ source.put(put);
+ }
+ }
+
+ private void delete(Table source, byte[] row, byte[]... families)
+ throws Exception {
+ for (byte[] fam : families) {
+ Delete del = new Delete(row);
+ del.addFamily(fam);
+ source.delete(del);
+ }
+ }
+
+ private void ensureRowExisted(Table target, byte[] row, byte[]... families)
+ throws Exception {
+ for (byte[] fam : families) {
+ Get get = new Get(row);
+ get.addFamily(fam);
+ for (int i = 0; i < NB_RETRIES; i++) {
+ if (i == NB_RETRIES - 1) {
+ fail("Waited too much time for put replication");
+ }
+ Result res = target.get(get);
+ if (res.size() == 0) {
+ LOG.info("Row not available");
+ } else {
+ assertEquals(res.size(), 1);
+ assertArrayEquals(res.value(), val);
+ break;
+ }
+ Thread.sleep(SLEEP_TIME);
+ }
+ }
+ }
+
+ private void ensureRowNotExisted(Table target, byte[] row, byte[]... families)
+ throws Exception {
+ for (byte[] fam : families) {
+ Get get = new Get(row);
+ get.addFamily(fam);
+ for (int i = 0; i < NB_RETRIES; i++) {
+ if (i == NB_RETRIES - 1) {
+ fail("Waited too much time for delete replication");
+ }
+ Result res = target.get(get);
+ if (res.size() >= 1) {
+ LOG.info("Row not deleted");
+ } else {
+ break;
+ }
+ Thread.sleep(SLEEP_TIME);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/1a1003a4/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java
index 04d9232..3d4062f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java
@@ -19,8 +19,10 @@
package org.apache.hadoop.hbase.replication;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.TreeMap;
import org.apache.hadoop.hbase.Cell;
@@ -196,19 +198,22 @@ public class TestReplicationWALEntryFilters {
}
@Test
- public void testTableCfWALEntryFilter() {
+ public void testNamespaceTableCfWALEntryFilter() {
ReplicationPeer peer = mock(ReplicationPeer.class);
+ // 1. no namespaces config and table-cfs config in peer
+ when(peer.getNamespaces()).thenReturn(null);
when(peer.getTableCFs()).thenReturn(null);
Entry userEntry = createEntry(null, a, b, c);
- WALEntryFilter filter = new ChainWALEntryFilter(new TableCfWALEntryFilter(peer));
+ WALEntryFilter filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
assertEquals(createEntry(null, a,b,c), filter.filter(userEntry));
+ // 2. Only config table-cfs in peer
// empty map
userEntry = createEntry(null, a, b, c);
Map<TableName, List<String>> tableCfs = new HashMap<TableName, List<String>>();
when(peer.getTableCFs()).thenReturn(tableCfs);
- filter = new ChainWALEntryFilter(new TableCfWALEntryFilter(peer));
+ filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
assertEquals(null, filter.filter(userEntry));
// table bar
@@ -216,7 +221,7 @@ public class TestReplicationWALEntryFilters {
tableCfs = new HashMap<TableName, List<String>>();
tableCfs.put(TableName.valueOf("bar"), null);
when(peer.getTableCFs()).thenReturn(tableCfs);
- filter = new ChainWALEntryFilter(new TableCfWALEntryFilter(peer));
+ filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
assertEquals(null, filter.filter(userEntry));
// table foo:a
@@ -224,7 +229,7 @@ public class TestReplicationWALEntryFilters {
tableCfs = new HashMap<TableName, List<String>>();
tableCfs.put(TableName.valueOf("foo"), Lists.newArrayList("a"));
when(peer.getTableCFs()).thenReturn(tableCfs);
- filter = new ChainWALEntryFilter(new TableCfWALEntryFilter(peer));
+ filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
assertEquals(createEntry(null, a), filter.filter(userEntry));
// table foo:a,c
@@ -232,8 +237,64 @@ public class TestReplicationWALEntryFilters {
tableCfs = new HashMap<TableName, List<String>>();
tableCfs.put(TableName.valueOf("foo"), Lists.newArrayList("a", "c"));
when(peer.getTableCFs()).thenReturn(tableCfs);
- filter = new ChainWALEntryFilter(new TableCfWALEntryFilter(peer));
+ filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
assertEquals(createEntry(null, a,c), filter.filter(userEntry));
+
+ // 3. Only config namespaces in peer
+ when(peer.getTableCFs()).thenReturn(null);
+ // empty set
+ Set<String> namespaces = new HashSet<String>();
+ when(peer.getNamespaces()).thenReturn(namespaces);
+ userEntry = createEntry(null, a, b, c);
+ filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
+ assertEquals(null, filter.filter(userEntry));
+
+ // namespace default
+ namespaces.add("default");
+ when(peer.getNamespaces()).thenReturn(namespaces);
+ userEntry = createEntry(null, a, b, c);
+ filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
+ assertEquals(createEntry(null, a,b,c), filter.filter(userEntry));
+
+ // namespace ns1
+ namespaces = new HashSet<String>();;
+ namespaces.add("ns1");
+ when(peer.getNamespaces()).thenReturn(namespaces);
+ userEntry = createEntry(null, a, b, c);
+ filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
+ assertEquals(null, filter.filter(userEntry));
+
+ // 4. Config namespaces and table-cfs both
+ // Namespaces config should not confict with table-cfs config
+ namespaces = new HashSet<String>();
+ tableCfs = new HashMap<TableName, List<String>>();
+ namespaces.add("ns1");
+ when(peer.getNamespaces()).thenReturn(namespaces);
+ tableCfs.put(TableName.valueOf("foo"), Lists.newArrayList("a", "c"));
+ when(peer.getTableCFs()).thenReturn(tableCfs);
+ userEntry = createEntry(null, a, b, c);
+ filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
+ assertEquals(createEntry(null, a, c), filter.filter(userEntry));
+
+ namespaces = new HashSet<String>();;
+ tableCfs = new HashMap<TableName, List<String>>();
+ namespaces.add("default");
+ when(peer.getNamespaces()).thenReturn(namespaces);
+ tableCfs.put(TableName.valueOf("ns1:foo"), Lists.newArrayList("a", "c"));
+ when(peer.getTableCFs()).thenReturn(tableCfs);
+ userEntry = createEntry(null, a, b, c);
+ filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
+ assertEquals(createEntry(null, a, b, c), filter.filter(userEntry));
+
+ namespaces = new HashSet<String>();;
+ tableCfs = new HashMap<TableName, List<String>>();
+ namespaces.add("ns1");
+ when(peer.getNamespaces()).thenReturn(namespaces);
+ tableCfs.put(TableName.valueOf("bar"), null);
+ when(peer.getTableCFs()).thenReturn(tableCfs);
+ userEntry = createEntry(null, a, b, c);
+ filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
+ assertEquals(null, filter.filter(userEntry));
}
private Entry createEntry(TreeMap<byte[], Integer> scopes, byte[]... kvs) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/1a1003a4/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java
index 6759daf..6f5ad56 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java
@@ -55,6 +55,7 @@ import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint.ReplicateContext;
import org.apache.hadoop.hbase.replication.ReplicationPeer;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.WALEntryFilter;
import org.apache.hadoop.hbase.replication.regionserver.RegionReplicaReplicationEndpoint.RegionReplicaReplayCallable;
import org.apache.hadoop.hbase.testclassification.MediumTests;
@@ -280,7 +281,9 @@ public class TestRegionReplicaReplicationEndpointNoMaster {
when(context.getMetrics()).thenReturn(mock(MetricsSource.class));
ReplicationPeer mockPeer = mock(ReplicationPeer.class);
+ when(mockPeer.getNamespaces()).thenReturn(null);
when(mockPeer.getTableCFs()).thenReturn(null);
+ when(mockPeer.getPeerConfig()).thenReturn(new ReplicationPeerConfig());
when(context.getReplicationPeer()).thenReturn(mockPeer);
replicator.init(context);
http://git-wip-us.apache.org/repos/asf/hbase/blob/1a1003a4/hbase-shell/src/main/ruby/hbase/replication_admin.rb
----------------------------------------------------------------------
diff --git a/hbase-shell/src/main/ruby/hbase/replication_admin.rb b/hbase-shell/src/main/ruby/hbase/replication_admin.rb
index 4de3962..f99ccae 100644
--- a/hbase-shell/src/main/ruby/hbase/replication_admin.rb
+++ b/hbase-shell/src/main/ruby/hbase/replication_admin.rb
@@ -62,6 +62,7 @@ module Hbase
config = args.fetch(CONFIG, nil)
data = args.fetch(DATA, nil)
table_cfs = args.fetch(TABLE_CFS, nil)
+ namespaces = args.fetch(NAMESPACES, nil)
# Create and populate a ReplicationPeerConfig
replication_peer_config = ReplicationPeerConfig.new
@@ -83,6 +84,14 @@ module Hbase
}
end
+ unless namespaces.nil?
+ ns_set = java.util.HashSet.new
+ namespaces.each do |n|
+ ns_set.add(n)
+ end
+ replication_peer_config.set_namespaces(ns_set)
+ end
+
unless table_cfs.nil?
# convert table_cfs to TableName
map = java.util.HashMap.new
@@ -180,12 +189,39 @@ module Hbase
end
@replication_admin.removePeerTableCFs(id, map)
end
+
+ # Set new namespaces config for the specified peer
+ def set_peer_namespaces(id, namespaces)
+ unless namespaces.nil?
+ ns_set = java.util.HashSet.new
+ namespaces.each do |n|
+ ns_set.add(n)
+ end
+ rpc = get_peer_config(id)
+ unless rpc.nil?
+ rpc.setNamespaces(ns_set)
+ @replication_admin.updatePeerConfig(id, rpc)
+ end
+ end
+ end
+
+ # Show the current namespaces config for the specified peer
+ def show_peer_namespaces(peer_config)
+ namespaces = peer_config.get_namespaces
+ if !namespaces.nil?
+ return namespaces.join(';')
+ else
+ return nil
+ end
+ end
+
#----------------------------------------------------------------------------------------------
# Enables a table's replication switch
def enable_tablerep(table_name)
tableName = TableName.valueOf(table_name)
@replication_admin.enableTableRep(tableName)
end
+
#----------------------------------------------------------------------------------------------
# Disables a table's replication switch
def disable_tablerep(table_name)
http://git-wip-us.apache.org/repos/asf/hbase/blob/1a1003a4/hbase-shell/src/main/ruby/hbase_constants.rb
----------------------------------------------------------------------
diff --git a/hbase-shell/src/main/ruby/hbase_constants.rb b/hbase-shell/src/main/ruby/hbase_constants.rb
index bc6f37c..c02d5c6 100644
--- a/hbase-shell/src/main/ruby/hbase_constants.rb
+++ b/hbase-shell/src/main/ruby/hbase_constants.rb
@@ -78,6 +78,7 @@ module HBaseConstants
ENDPOINT_CLASSNAME = 'ENDPOINT_CLASSNAME'
CLUSTER_KEY = 'CLUSTER_KEY'
TABLE_CFS = 'TABLE_CFS'
+ NAMESPACES = 'NAMESPACES'
CONFIG = 'CONFIG'
DATA = 'DATA'
http://git-wip-us.apache.org/repos/asf/hbase/blob/1a1003a4/hbase-shell/src/main/ruby/shell.rb
----------------------------------------------------------------------
diff --git a/hbase-shell/src/main/ruby/shell.rb b/hbase-shell/src/main/ruby/shell.rb
index bb6a604..ee508e9 100644
--- a/hbase-shell/src/main/ruby/shell.rb
+++ b/hbase-shell/src/main/ruby/shell.rb
@@ -370,6 +370,7 @@ Shell.load_command_group(
list_peers
enable_peer
disable_peer
+ set_peer_namespaces
show_peer_tableCFs
set_peer_tableCFs
list_replicated_tables
http://git-wip-us.apache.org/repos/asf/hbase/blob/1a1003a4/hbase-shell/src/main/ruby/shell/commands/add_peer.rb
----------------------------------------------------------------------
diff --git a/hbase-shell/src/main/ruby/shell/commands/add_peer.rb b/hbase-shell/src/main/ruby/shell/commands/add_peer.rb
index e9431cf..077bd69 100644
--- a/hbase-shell/src/main/ruby/shell/commands/add_peer.rb
+++ b/hbase-shell/src/main/ruby/shell/commands/add_peer.rb
@@ -27,13 +27,25 @@ must be specified to identify the peer.
For a HBase cluster peer, a cluster key must be provided and is composed like this:
hbase.zookeeper.quorum:hbase.zookeeper.property.clientPort:zookeeper.znode.parent
-This gives a full path for HBase to connect to another HBase cluster. An optional parameter for
-table column families identifies which column families will be replicated to the peer cluster.
+This gives a full path for HBase to connect to another HBase cluster.
+An optional parameter for namespaces identifies which namespace's tables will be replicated
+to the peer cluster.
+An optional parameter for table column families identifies which tables and/or column families
+will be replicated to the peer cluster.
+
+Notice: Set a namespace in the peer config means that all tables in this namespace
+will be replicated to the peer cluster. So if you already have set a namespace in peer config,
+then you can't set this namespace's tables in the peer config again.
+
Examples:
hbase> add_peer '1', CLUSTER_KEY => "server1.cie.com:2181:/hbase"
hbase> add_peer '2', CLUSTER_KEY => "zk1,zk2,zk3:2182:/hbase-prod",
TABLE_CFS => { "table1" => [], "table2" => ["cf1"], "table3" => ["cf1", "cf2"] }
+ hbase> add_peer '2', CLUSTER_KEY => "zk1,zk2,zk3:2182:/hbase-prod",
+ NAMESPACES => ["ns1", "ns2", "ns3"]
+ hbase> add_peer '2', CLUSTER_KEY => "zk1,zk2,zk3:2182:/hbase-prod",
+ NAMESPACES => ["ns1", "ns2"], TABLE_CFS => { "ns3:table1" => [], "ns3:table2" => ["cf1"] }
For a custom replication endpoint, the ENDPOINT_CLASSNAME can be provided. Two optional arguments
are DATA and CONFIG which can be specified to set different either the peer_data or configuration
http://git-wip-us.apache.org/repos/asf/hbase/blob/1a1003a4/hbase-shell/src/main/ruby/shell/commands/list_peers.rb
----------------------------------------------------------------------
diff --git a/hbase-shell/src/main/ruby/shell/commands/list_peers.rb b/hbase-shell/src/main/ruby/shell/commands/list_peers.rb
index 72a0704..ed6b575 100644
--- a/hbase-shell/src/main/ruby/shell/commands/list_peers.rb
+++ b/hbase-shell/src/main/ruby/shell/commands/list_peers.rb
@@ -32,12 +32,15 @@ EOF
def command()
peers = replication_admin.list_peers
- formatter.header(["PEER_ID", "CLUSTER_KEY", "STATE", "TABLE_CFS"])
+ formatter.header(["PEER_ID", "CLUSTER_KEY", "ENDPOINT_CLASSNAME",
+ "STATE", "NAMESPACES", "TABLE_CFS"])
peers.entrySet().each do |e|
state = replication_admin.get_peer_state(e.key)
+ namespaces = replication_admin.show_peer_namespaces(e.value)
tableCFs = replication_admin.show_peer_tableCFs(e.key)
- formatter.row([ e.key, e.value, state, tableCFs ])
+ formatter.row([ e.key, e.value.getClusterKey,
+ e.value.getReplicationEndpointImpl, state, namespaces, tableCFs ])
end
formatter.footer()
http://git-wip-us.apache.org/repos/asf/hbase/blob/1a1003a4/hbase-shell/src/main/ruby/shell/commands/set_peer_namespaces.rb
----------------------------------------------------------------------
diff --git a/hbase-shell/src/main/ruby/shell/commands/set_peer_namespaces.rb b/hbase-shell/src/main/ruby/shell/commands/set_peer_namespaces.rb
new file mode 100644
index 0000000..75b3d11
--- /dev/null
+++ b/hbase-shell/src/main/ruby/shell/commands/set_peer_namespaces.rb
@@ -0,0 +1,51 @@
+#
+# Copyright The Apache Software Foundation
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+module Shell
+ module Commands
+ class SetPeerNamespaces< Command
+ def help
+ return <<-EOF
+ Set the replicable namespaces config for the specified peer.
+
+ Set a namespace in the peer config means that all tables in this
+ namespace will be replicated to the peer cluster. So if you already
+ have set a namespace in the peer config, then you can't set this
+ namespace's tables in the peer config again.
+
+ Examples:
+
+ # set namespaces config is null, then the table-cfs config decide
+ # which table to be replicated.
+ hbase> set_peer_namespaces '1', []
+ # set namespaces to be replicable for a peer.
+ # set a namespace in the peer config means that all tables in this
+ # namespace (with replication_scope != 0 ) will be replicated.
+ hbase> set_peer_namespaces '2', ["ns1", "ns2"]
+
+ EOF
+ end
+
+ def command(id, namespaces)
+ replication_admin.set_peer_namespaces(id, namespaces)
+ end
+ end
+ end
+end
http://git-wip-us.apache.org/repos/asf/hbase/blob/1a1003a4/hbase-shell/src/main/ruby/shell/commands/set_peer_tableCFs.rb
----------------------------------------------------------------------
diff --git a/hbase-shell/src/main/ruby/shell/commands/set_peer_tableCFs.rb b/hbase-shell/src/main/ruby/shell/commands/set_peer_tableCFs.rb
index b2e823c..4d3c3ec 100644
--- a/hbase-shell/src/main/ruby/shell/commands/set_peer_tableCFs.rb
+++ b/hbase-shell/src/main/ruby/shell/commands/set_peer_tableCFs.rb
@@ -23,11 +23,15 @@ module Shell
class SetPeerTableCFs< Command
def help
return <<-EOF
- Set the replicable table-cf config for the specified peer
+ Set the replicable table-cf config for the specified peer.
+
+ Can't set a table to table-cfs config if it's namespace already was in
+ namespaces config of this peer.
+
Examples:
- # set all tables to be replicable for a peer
- hbase> set_peer_tableCFs '1', ""
+ # set table-cfs config is null, then the namespaces config decide which
+ # table to be replicated.
hbase> set_peer_tableCFs '1'
# set table / table-cf to be replicable for a peer, for a table without
# an explicit column-family list, all replicable column-families (with
http://git-wip-us.apache.org/repos/asf/hbase/blob/1a1003a4/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb
----------------------------------------------------------------------
diff --git a/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb b/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb
index 1d27e67..daa8f96 100644
--- a/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb
+++ b/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb
@@ -121,6 +121,49 @@ module Hbase
command(:remove_peer, @peer_id)
end
+ define_test "add_peer: multiple zk cluster key and namespaces" do
+ cluster_key = "zk4,zk5,zk6:11000:/hbase-test"
+ namespaces = ["ns1", "ns2", "ns3"]
+ namespaces_str = "ns2;ns1;ns3"
+
+ args = { CLUSTER_KEY => cluster_key, NAMESPACES => namespaces }
+ command(:add_peer, @peer_id, args)
+
+ assert_equal(1, command(:list_peers).length)
+ assert(command(:list_peers).key?(@peer_id))
+ peer_config = command(:list_peers).fetch(@peer_id)
+ assert_equal(cluster_key, peer_config.get_cluster_key)
+ assert_equal(namespaces_str,
+ replication_admin.show_peer_namespaces(peer_config))
+
+ # cleanup for future tests
+ command(:remove_peer, @peer_id)
+ end
+
+ define_test "add_peer: multiple zk cluster key and namespaces, table_cfs" do
+ cluster_key = "zk4,zk5,zk6:11000:/hbase-test"
+ namespaces = ["ns1", "ns2"]
+ table_cfs = { "ns3:table1" => [], "ns3:table2" => ["cf1"],
+ "ns3:table3" => ["cf1", "cf2"] }
+ namespaces_str = "ns2;ns1"
+ table_cfs_str = "ns3.table1;ns3.table3:cf1,cf2;ns3.table2:cf1"
+
+ args = { CLUSTER_KEY => cluster_key, NAMESPACES => namespaces,
+ TABLE_CFS => table_cfs }
+ command(:add_peer, @peer_id, args)
+
+ assert_equal(1, command(:list_peers).length)
+ assert(command(:list_peers).key?(@peer_id))
+ peer_config = command(:list_peers).fetch(@peer_id)
+ assert_equal(cluster_key, peer_config.get_cluster_key)
+ assert_equal(namespaces_str,
+ replication_admin.show_peer_namespaces(peer_config))
+ assert_equal(table_cfs_str, command(:show_peer_tableCFs, @peer_id))
+
+ # cleanup for future tests
+ command(:remove_peer, @peer_id)
+ end
+
define_test "add_peer: multiple zk cluster key and table_cfs - peer config" do
cluster_key = "zk4,zk5,zk6:11000:/hbase-test"
table_cfs = { "table1" => [], "table2" => ["cf1"], "table3" => ["cf1", "cf2"] }
@@ -152,6 +195,30 @@ module Hbase
end
end
+ define_test "set_peer_namespaces: works with namespaces array" do
+ cluster_key = "zk4,zk5,zk6:11000:/hbase-test"
+ namespaces = ["ns1", "ns2"]
+ namespaces_str = "ns2;ns1"
+
+ args = { CLUSTER_KEY => cluster_key }
+ command(:add_peer, @peer_id, args)
+
+ # Normally the ReplicationSourceManager will call ReplicationPeer#peer_added
+ # but here we have to do it ourselves
+ replication_admin.peer_added(@peer_id)
+
+ command(:set_peer_namespaces, @peer_id, namespaces)
+
+ assert_equal(1, command(:list_peers).length)
+ assert(command(:list_peers).key?(@peer_id))
+ peer_config = command(:list_peers).fetch(@peer_id)
+ assert_equal(namespaces_str,
+ replication_admin.show_peer_namespaces(peer_config))
+
+ # cleanup for future tests
+ command(:remove_peer, @peer_id)
+ end
+
define_test "get_peer_config: works with simple clusterKey peer" do
cluster_key = "localhost:2181:/hbase-test"
args = { CLUSTER_KEY => cluster_key }
@@ -221,8 +288,8 @@ module Hbase
assert_equal("value2", peer_config.get_configuration.get("config2"))
assert_equal("new_value1", Bytes.to_string(peer_config.get_peer_data.get(Bytes.toBytes("data1"))))
assert_equal("value2", Bytes.to_string(peer_config.get_peer_data.get(Bytes.toBytes("data2"))))
-
end
+
# assert_raise fails on native exceptions - https://jira.codehaus.org/browse/JRUBY-5279
# Can't catch native Java exception with assert_raise in JRuby 1.6.8 as in the test below.
# define_test "add_peer: adding a second peer with same id should error" do