You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by sy...@apache.org on 2016/03/30 18:03:50 UTC
[48/50] [abbrv] hbase git commit: HBASE-11393 Replication TableCfs
should be a PB object rather than a string
HBASE-11393 Replication TableCfs should be a PB object rather than a string
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/7f39baf0
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/7f39baf0
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/7f39baf0
Branch: refs/heads/hbase-12439
Commit: 7f39baf0f4572ff209837d7de5d37554851ecbb7
Parents: 0520097
Author: chenheng <ch...@apache.org>
Authored: Fri Mar 25 14:16:47 2016 +0800
Committer: chenheng <ch...@apache.org>
Committed: Tue Mar 29 10:25:29 2016 +0800
----------------------------------------------------------------------
.../client/replication/ReplicationAdmin.java | 170 +--
.../replication/ReplicationSerDeHelper.java | 315 +++++
.../hbase/replication/ReplicationPeer.java | 1 +
.../replication/ReplicationPeerConfig.java | 16 +-
.../replication/ReplicationPeerZKImpl.java | 76 +-
.../hbase/replication/ReplicationPeers.java | 19 +-
.../replication/ReplicationPeersZKImpl.java | 163 +--
.../replication/ReplicationStateZKBase.java | 19 +
.../protobuf/generated/ZooKeeperProtos.java | 1155 +++++++++++++++++-
.../src/main/protobuf/ZooKeeper.proto | 6 +
.../org/apache/hadoop/hbase/master/HMaster.java | 10 +-
.../replication/master/TableCFsUpdater.java | 122 ++
.../hbase/client/TestReplicaWithCluster.java | 6 +-
.../replication/TestReplicationAdmin.java | 195 +--
.../cleaner/TestReplicationHFileCleaner.java | 2 +-
.../replication/TestMasterReplication.java | 11 +-
.../replication/TestMultiSlaveReplication.java | 10 +-
.../replication/TestPerTableCFReplication.java | 158 ++-
.../hbase/replication/TestReplicationBase.java | 4 +-
.../replication/TestReplicationSmallTests.java | 5 +-
.../replication/TestReplicationStateBasic.java | 14 +-
.../replication/TestReplicationSyncUpTool.java | 4 +-
.../TestReplicationTrackerZKImpl.java | 10 +-
.../replication/TestReplicationWithTags.java | 6 +-
.../replication/master/TestTableCFsUpdater.java | 210 ++++
...sibilityLabelReplicationWithExpAsString.java | 9 +-
.../TestVisibilityLabelsReplication.java | 5 +-
.../hadoop/hbase/util/TestHBaseFsckOneRS.java | 5 +-
.../src/main/ruby/hbase/replication_admin.rb | 44 +-
.../src/main/ruby/shell/commands/add_peer.rb | 4 +-
.../ruby/shell/commands/append_peer_tableCFs.rb | 2 +-
.../ruby/shell/commands/remove_peer_tableCFs.rb | 4 +-
.../ruby/shell/commands/set_peer_tableCFs.rb | 5 +-
33 files changed, 2309 insertions(+), 476 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/7f39baf0/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
index dcf1957..8ee3a22 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
@@ -32,7 +32,6 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
-import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -48,6 +47,7 @@ import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.RegionLocator;
+import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationFactory;
import org.apache.hadoop.hbase.replication.ReplicationPeer;
@@ -166,27 +166,6 @@ public class ReplicationAdmin implements Closeable {
}
/**
- * Add a new peer cluster to replicate to.
- * @param id a short name that identifies the cluster
- * @param clusterKey the concatenation of the slave cluster's
- * <code>hbase.zookeeper.quorum:hbase.zookeeper.property.clientPort:zookeeper.znode.parent</code>
- * @throws IllegalStateException if there's already one slave since
- * multi-slave isn't supported yet.
- * @deprecated Use addPeer(String, ReplicationPeerConfig, Map) instead.
- */
- @Deprecated
- public void addPeer(String id, String clusterKey) throws ReplicationException {
- this.addPeer(id, new ReplicationPeerConfig().setClusterKey(clusterKey), null);
- }
-
- @Deprecated
- public void addPeer(String id, String clusterKey, String tableCFs)
- throws ReplicationException {
- this.replicationPeers.addPeer(id,
- new ReplicationPeerConfig().setClusterKey(clusterKey), tableCFs);
- }
-
- /**
* Add a new remote slave cluster for replication.
* @param id a short name that identifies the cluster
* @param peerConfig configuration for the replication slave cluster
@@ -194,83 +173,36 @@ public class ReplicationAdmin implements Closeable {
* A map from tableName to column family names. An empty collection can be passed
* to indicate replicating all column families. Pass null for replicating all table and column
* families
+ * @deprecated as release of 2.0.0, and it will be removed in 3.0.0,
+ * use {@link #addPeer(String, ReplicationPeerConfig)} instead.
*/
+ @Deprecated
public void addPeer(String id, ReplicationPeerConfig peerConfig,
Map<TableName, ? extends Collection<String>> tableCfs) throws ReplicationException {
- this.replicationPeers.addPeer(id, peerConfig, getTableCfsStr(tableCfs));
- }
-
- public static Map<TableName, List<String>> parseTableCFsFromConfig(String tableCFsConfig) {
- if (tableCFsConfig == null || tableCFsConfig.trim().length() == 0) {
- return null;
- }
-
- Map<TableName, List<String>> tableCFsMap = null;
- // TODO: This should be a PB object rather than a String to be parsed!! See HBASE-11393
- // parse out (table, cf-list) pairs from tableCFsConfig
- // format: "table1:cf1,cf2;table2:cfA,cfB"
- String[] tables = tableCFsConfig.split(";");
- for (String tab : tables) {
- // 1 ignore empty table config
- tab = tab.trim();
- if (tab.length() == 0) {
- continue;
- }
- // 2 split to "table" and "cf1,cf2"
- // for each table: "table:cf1,cf2" or "table"
- String[] pair = tab.split(":");
- String tabName = pair[0].trim();
- if (pair.length > 2 || tabName.length() == 0) {
- LOG.error("ignore invalid tableCFs setting: " + tab);
- continue;
- }
-
- // 3 parse "cf1,cf2" part to List<cf>
- List<String> cfs = null;
- if (pair.length == 2) {
- String[] cfsList = pair[1].split(",");
- for (String cf : cfsList) {
- String cfName = cf.trim();
- if (cfName.length() > 0) {
- if (cfs == null) {
- cfs = new ArrayList<String>();
- }
- cfs.add(cfName);
- }
- }
- }
-
- // 4 put <table, List<cf>> to map
- if (tableCFsMap == null) {
- tableCFsMap = new HashMap<TableName, List<String>>();
- }
- tableCFsMap.put(TableName.valueOf(tabName), cfs);
+ if (tableCfs != null) {
+ peerConfig.setTableCFsMap(tableCfs);
}
- return tableCFsMap;
+ this.replicationPeers.addPeer(id, peerConfig);
}
- @VisibleForTesting
- static String getTableCfsStr(Map<TableName, ? extends Collection<String>> tableCfs) {
- String tableCfsStr = null;
- if (tableCfs != null) {
- // Format: table1:cf1,cf2;table2:cfA,cfB;table3
- StringBuilder builder = new StringBuilder();
- for (Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) {
- if (builder.length() > 0) {
- builder.append(";");
- }
- builder.append(entry.getKey());
- if (entry.getValue() != null && !entry.getValue().isEmpty()) {
- builder.append(":");
- builder.append(StringUtils.join(entry.getValue(), ","));
- }
- }
- tableCfsStr = builder.toString();
- }
- return tableCfsStr;
+ /**
+ * Add a new remote slave cluster for replication.
+ * @param id a short name that identifies the cluster
+ * @param peerConfig configuration for the replication slave cluster
+ */
+ public void addPeer(String id, ReplicationPeerConfig peerConfig) throws ReplicationException {
+ this.replicationPeers.addPeer(id, peerConfig);
}
/**
+ * @deprecated as release of 2.0.0, and it will be removed in 3.0.0
+ * */
+ @Deprecated
+ public static Map<TableName, List<String>> parseTableCFsFromConfig(String tableCFsConfig) {
+ return ReplicationSerDeHelper.parseTableCFsFromConfig(tableCFsConfig);
+ }
+
+ /**
* Removes a peer cluster and stops the replication to it.
* @param id a short name that identifies the cluster
*/
@@ -302,22 +234,6 @@ public class ReplicationAdmin implements Closeable {
return this.replicationPeers.getAllPeerIds().size();
}
- /**
- * Map of this cluster's peers for display.
- * @return A map of peer ids to peer cluster keys
- * @deprecated use {@link #listPeerConfigs()}
- */
- @Deprecated
- public Map<String, String> listPeers() {
- Map<String, ReplicationPeerConfig> peers = this.listPeerConfigs();
- Map<String, String> ret = new HashMap<String, String>(peers.size());
-
- for (Map.Entry<String, ReplicationPeerConfig> entry : peers.entrySet()) {
- ret.put(entry.getKey(), entry.getValue().getClusterKey());
- }
- return ret;
- }
-
public Map<String, ReplicationPeerConfig> listPeerConfigs() {
return this.replicationPeers.getAllPeerConfigs();
}
@@ -329,19 +245,12 @@ public class ReplicationAdmin implements Closeable {
/**
* Get the replicable table-cf config of the specified peer.
* @param id a short name that identifies the cluster
- */
- public String getPeerTableCFs(String id) throws ReplicationException {
- return this.replicationPeers.getPeerTableCFsConfig(id);
- }
-
- /**
- * Set the replicable table-cf config of the specified peer
- * @param id a short name that identifies the cluster
- * @deprecated use {@link #setPeerTableCFs(String, Map)}
- */
+ * @deprecated as release of 2.0.0, and it will be removed in 3.0.0,
+ * use {@link #getPeerConfig(String)} instead.
+ * */
@Deprecated
- public void setPeerTableCFs(String id, String tableCFs) throws ReplicationException {
- this.replicationPeers.setPeerTableCFsConfig(id, tableCFs);
+ public String getPeerTableCFs(String id) throws ReplicationException {
+ return ReplicationSerDeHelper.convertToString(this.replicationPeers.getPeerTableCFsConfig(id));
}
/**
@@ -349,9 +258,12 @@ public class ReplicationAdmin implements Closeable {
* @param id a short that identifies the cluster
* @param tableCfs table-cfs config str
* @throws ReplicationException
+ * @deprecated as release of 2.0.0, and it will be removed in 3.0.0,
+ * use {@link #appendPeerTableCFs(String, Map)} instead.
*/
+ @Deprecated
public void appendPeerTableCFs(String id, String tableCfs) throws ReplicationException {
- appendPeerTableCFs(id, parseTableCFsFromConfig(tableCfs));
+ appendPeerTableCFs(id, ReplicationSerDeHelper.parseTableCFsFromConfig(tableCfs));
}
/**
@@ -365,12 +277,11 @@ public class ReplicationAdmin implements Closeable {
if (tableCfs == null) {
throw new ReplicationException("tableCfs is null");
}
- Map<TableName, List<String>> preTableCfs = parseTableCFsFromConfig(getPeerTableCFs(id));
+ Map<TableName, List<String>> preTableCfs = this.replicationPeers.getPeerTableCFsConfig(id);
if (preTableCfs == null) {
setPeerTableCFs(id, tableCfs);
return;
}
-
for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) {
TableName table = entry.getKey();
Collection<String> appendCfs = entry.getValue();
@@ -382,6 +293,7 @@ public class ReplicationAdmin implements Closeable {
Set<String> cfSet = new HashSet<String>(cfs);
cfSet.addAll(appendCfs);
preTableCfs.put(table, Lists.newArrayList(cfSet));
+
}
} else {
if (appendCfs == null || appendCfs.isEmpty()) {
@@ -399,9 +311,12 @@ public class ReplicationAdmin implements Closeable {
* @param id a short name that identifies the cluster
* @param tableCf table-cfs config str
* @throws ReplicationException
+ * @deprecated as release of 2.0.0, and it will be removed in 3.0.0,
+ * use {@link #removePeerTableCFs(String, Map)} instead.
*/
+ @Deprecated
public void removePeerTableCFs(String id, String tableCf) throws ReplicationException {
- removePeerTableCFs(id, parseTableCFsFromConfig(tableCf));
+ removePeerTableCFs(id, ReplicationSerDeHelper.parseTableCFsFromConfig(tableCf));
}
/**
@@ -415,12 +330,12 @@ public class ReplicationAdmin implements Closeable {
if (tableCfs == null) {
throw new ReplicationException("tableCfs is null");
}
-
- Map<TableName, List<String>> preTableCfs = parseTableCFsFromConfig(getPeerTableCFs(id));
+ Map<TableName, List<String>> preTableCfs = this.replicationPeers.getPeerTableCFsConfig(id);
if (preTableCfs == null) {
throw new ReplicationException("Table-Cfs for peer" + id + " is null");
}
for (Map.Entry<TableName, ? extends Collection<String>> entry: tableCfs.entrySet()) {
+
TableName table = entry.getKey();
Collection<String> removeCfs = entry.getValue();
if (preTableCfs.containsKey(table)) {
@@ -444,6 +359,7 @@ public class ReplicationAdmin implements Closeable {
}
} else {
throw new ReplicationException("No table: " + table + " in table-cfs config of peer: " + id);
+
}
}
setPeerTableCFs(id, preTableCfs);
@@ -459,7 +375,7 @@ public class ReplicationAdmin implements Closeable {
*/
public void setPeerTableCFs(String id, Map<TableName, ? extends Collection<String>> tableCfs)
throws ReplicationException {
- this.replicationPeers.setPeerTableCFsConfig(id, getTableCfsStr(tableCfs));
+ this.replicationPeers.setPeerTableCFsConfig(id, tableCfs);
}
/**
@@ -645,8 +561,8 @@ public class ReplicationAdmin implements Closeable {
try {
Pair<ReplicationPeerConfig, Configuration> pair = this.replicationPeers.getPeerConf(peerId);
Configuration peerConf = pair.getSecond();
- ReplicationPeer peer = new ReplicationPeerZKImpl(peerConf, peerId, pair.getFirst(),
- parseTableCFsFromConfig(this.getPeerTableCFs(peerId)));
+ ReplicationPeer peer = new ReplicationPeerZKImpl(zkw, pair.getSecond(),
+ peerId, pair.getFirst(), this.connection);
listOfPeers.add(peer);
} catch (ReplicationException e) {
LOG.warn("Failed to get valid replication peers. "
http://git-wip-us.apache.org/repos/asf/hbase/blob/7f39baf0/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationSerDeHelper.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationSerDeHelper.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationSerDeHelper.java
new file mode 100644
index 0000000..9682f89
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationSerDeHelper.java
@@ -0,0 +1,315 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client.replication;
+
+import com.google.protobuf.ByteString;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.exceptions.DeserializationException;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
+import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Strings;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.ArrayList;
+
+/**
+ * Helper for TableCFs Operations.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Stable
+public final class ReplicationSerDeHelper {
+
+ private static final Log LOG = LogFactory.getLog(ReplicationSerDeHelper.class);
+
+ private ReplicationSerDeHelper() {}
+
+ /** convert map to TableCFs Object */
+ public static ZooKeeperProtos.TableCF[] convert(
+ Map<TableName, ? extends Collection<String>> tableCfs) {
+ if (tableCfs == null) {
+ return null;
+ }
+ List<ZooKeeperProtos.TableCF> tableCFList = new ArrayList<>();
+ ZooKeeperProtos.TableCF.Builder tableCFBuilder = ZooKeeperProtos.TableCF.newBuilder();
+ for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) {
+ tableCFBuilder.clear();
+ tableCFBuilder.setTableName(ProtobufUtil.toProtoTableName(entry.getKey()));
+ Collection<String> v = entry.getValue();
+ if (v != null && !v.isEmpty()) {
+ for (String value : entry.getValue()) {
+ tableCFBuilder.addFamilies(ByteString.copyFromUtf8(value));
+ }
+ }
+ tableCFList.add(tableCFBuilder.build());
+ }
+ return tableCFList.toArray(new ZooKeeperProtos.TableCF[tableCFList.size()]);
+ }
+
+ public static String convertToString(Map<TableName, ? extends Collection<String>> tableCfs) {
+ if (tableCfs == null) {
+ return null;
+ }
+ return convert(convert(tableCfs));
+ }
+
+ /**
+ * Convert string to TableCFs Object.
+ * This is only for read TableCFs information from TableCF node.
+ * Input String Format: ns1.table1:cf1,cf2;ns2.table2:cfA,cfB;ns3.table3.
+ * */
+ public static ZooKeeperProtos.TableCF[] convert(String tableCFsConfig) {
+ if (tableCFsConfig == null || tableCFsConfig.trim().length() == 0) {
+ return null;
+ }
+ List<ZooKeeperProtos.TableCF> tableCFList = new ArrayList<>();
+ ZooKeeperProtos.TableCF.Builder tableCFBuilder = ZooKeeperProtos.TableCF.newBuilder();
+
+ String[] tables = tableCFsConfig.split(";");
+ for (String tab : tables) {
+ // 1 ignore empty table config
+ tab = tab.trim();
+ if (tab.length() == 0) {
+ continue;
+ }
+ // 2 split to "table" and "cf1,cf2"
+ // for each table: "table#cf1,cf2" or "table"
+ String[] pair = tab.split(":");
+ String tabName = pair[0].trim();
+ if (pair.length > 2 || tabName.length() == 0) {
+ LOG.info("incorrect format:" + tableCFsConfig);
+ continue;
+ }
+
+ tableCFBuilder.clear();
+ // split namespace from tableName
+ String ns = "default";
+ String tName = tabName;
+ String[] dbs = tabName.split("\\.");
+ if (dbs != null && dbs.length == 2) {
+ ns = dbs[0];
+ tName = dbs[1];
+ }
+ tableCFBuilder.setTableName(
+ ProtobufUtil.toProtoTableName(TableName.valueOf(ns, tName)));
+
+ // 3 parse "cf1,cf2" part to List<cf>
+ if (pair.length == 2) {
+ String[] cfsList = pair[1].split(",");
+ for (String cf : cfsList) {
+ String cfName = cf.trim();
+ if (cfName.length() > 0) {
+ tableCFBuilder.addFamilies(ByteString.copyFromUtf8(cfName));
+ }
+ }
+ }
+ tableCFList.add(tableCFBuilder.build());
+ }
+ return tableCFList.toArray(new ZooKeeperProtos.TableCF[tableCFList.size()]);
+ }
+
+ /**
+ * Convert TableCFs Object to String.
+ * Output String Format: ns1.table1:cf1,cf2;ns2.table2:cfA,cfB;table3
+ * */
+ public static String convert(ZooKeeperProtos.TableCF[] tableCFs) {
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0, n = tableCFs.length; i < n; i++) {
+ ZooKeeperProtos.TableCF tableCF = tableCFs[i];
+ String namespace = tableCF.getTableName().getNamespace().toStringUtf8();
+ if (!Strings.isEmpty(namespace)) {
+ sb.append(namespace).append(".").
+ append(tableCF.getTableName().getQualifier().toStringUtf8())
+ .append(":");
+ } else {
+ sb.append(tableCF.getTableName().toString()).append(":");
+ }
+ for (int j = 0; j < tableCF.getFamiliesCount(); j++) {
+ sb.append(tableCF.getFamilies(j).toStringUtf8()).append(",");
+ }
+ sb.deleteCharAt(sb.length() - 1).append(";");
+ }
+ if (sb.length() > 0) {
+ sb.deleteCharAt(sb.length() - 1);
+ }
+ return sb.toString();
+ }
+
+ /**
+ * Get TableCF in TableCFs, if not exist, return null.
+ * */
+ public static ZooKeeperProtos.TableCF getTableCF(ZooKeeperProtos.TableCF[] tableCFs,
+ String table) {
+ for (int i = 0, n = tableCFs.length; i < n; i++) {
+ ZooKeeperProtos.TableCF tableCF = tableCFs[i];
+ if (tableCF.getTableName().getQualifier().toStringUtf8().equals(table)) {
+ return tableCF;
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Parse bytes into TableCFs.
+ * It is used for backward compatibility.
+ * Old format bytes have no PB_MAGIC Header
+ * */
+ public static ZooKeeperProtos.TableCF[] parseTableCFs(byte[] bytes) throws IOException {
+ if (bytes == null) {
+ return null;
+ }
+ return ReplicationSerDeHelper.convert(Bytes.toString(bytes));
+ }
+
+ /**
+ * Convert tableCFs string into Map.
+ * */
+ public static Map<TableName, List<String>> parseTableCFsFromConfig(String tableCFsConfig) {
+ ZooKeeperProtos.TableCF[] tableCFs = convert(tableCFsConfig);
+ return convert2Map(tableCFs);
+ }
+
+ /**
+ * Convert tableCFs Object to Map.
+ * */
+ public static Map<TableName, List<String>> convert2Map(ZooKeeperProtos.TableCF[] tableCFs) {
+ if (tableCFs == null || tableCFs.length == 0) {
+ return null;
+ }
+ Map<TableName, List<String>> tableCFsMap = new HashMap<TableName, List<String>>();
+ for (int i = 0, n = tableCFs.length; i < n; i++) {
+ ZooKeeperProtos.TableCF tableCF = tableCFs[i];
+ List<String> families = new ArrayList<>();
+ for (int j = 0, m = tableCF.getFamiliesCount(); j < m; j++) {
+ families.add(tableCF.getFamilies(j).toStringUtf8());
+ }
+ if (families.size() > 0) {
+ tableCFsMap.put(ProtobufUtil.toTableName(tableCF.getTableName()), families);
+ } else {
+ tableCFsMap.put(ProtobufUtil.toTableName(tableCF.getTableName()), null);
+ }
+ }
+
+ return tableCFsMap;
+ }
+
+ /**
+ * @param bytes Content of a peer znode.
+ * @return ClusterKey parsed from the passed bytes.
+ * @throws DeserializationException
+ */
+ public static ReplicationPeerConfig parsePeerFrom(final byte[] bytes)
+ throws DeserializationException {
+ if (ProtobufUtil.isPBMagicPrefix(bytes)) {
+ int pblen = ProtobufUtil.lengthOfPBMagic();
+ ZooKeeperProtos.ReplicationPeer.Builder builder =
+ ZooKeeperProtos.ReplicationPeer.newBuilder();
+ ZooKeeperProtos.ReplicationPeer peer;
+ try {
+ ProtobufUtil.mergeFrom(builder, bytes, pblen, bytes.length - pblen);
+ peer = builder.build();
+ } catch (IOException e) {
+ throw new DeserializationException(e);
+ }
+ return convert(peer);
+ } else {
+ if (bytes.length > 0) {
+ return new ReplicationPeerConfig().setClusterKey(Bytes.toString(bytes));
+ }
+ return new ReplicationPeerConfig().setClusterKey("");
+ }
+ }
+
+ public static ReplicationPeerConfig convert(ZooKeeperProtos.ReplicationPeer peer) {
+ ReplicationPeerConfig peerConfig = new ReplicationPeerConfig();
+ if (peer.hasClusterkey()) {
+ peerConfig.setClusterKey(peer.getClusterkey());
+ }
+ if (peer.hasReplicationEndpointImpl()) {
+ peerConfig.setReplicationEndpointImpl(peer.getReplicationEndpointImpl());
+ }
+
+ for (HBaseProtos.BytesBytesPair pair : peer.getDataList()) {
+ peerConfig.getPeerData().put(pair.getFirst().toByteArray(), pair.getSecond().toByteArray());
+ }
+
+ for (HBaseProtos.NameStringPair pair : peer.getConfigurationList()) {
+ peerConfig.getConfiguration().put(pair.getName(), pair.getValue());
+ }
+ Map<TableName, ? extends Collection<String>> tableCFsMap = convert2Map(
+ peer.getTableCfsList().toArray(new ZooKeeperProtos.TableCF[peer.getTableCfsCount()]));
+ if (tableCFsMap != null) {
+ peerConfig.setTableCFsMap(tableCFsMap);
+ }
+ return peerConfig;
+ }
+
+ public static ZooKeeperProtos.ReplicationPeer convert(ReplicationPeerConfig peerConfig) {
+ ZooKeeperProtos.ReplicationPeer.Builder builder = ZooKeeperProtos.ReplicationPeer.newBuilder();
+ if (peerConfig.getClusterKey() != null) {
+ builder.setClusterkey(peerConfig.getClusterKey());
+ }
+ if (peerConfig.getReplicationEndpointImpl() != null) {
+ builder.setReplicationEndpointImpl(peerConfig.getReplicationEndpointImpl());
+ }
+
+ for (Map.Entry<byte[], byte[]> entry : peerConfig.getPeerData().entrySet()) {
+ builder.addData(HBaseProtos.BytesBytesPair.newBuilder()
+ .setFirst(ByteString.copyFrom(entry.getKey()))
+ .setSecond(ByteString.copyFrom(entry.getValue()))
+ .build());
+ }
+
+ for (Map.Entry<String, String> entry : peerConfig.getConfiguration().entrySet()) {
+ builder.addConfiguration(HBaseProtos.NameStringPair.newBuilder()
+ .setName(entry.getKey())
+ .setValue(entry.getValue())
+ .build());
+ }
+ ZooKeeperProtos.TableCF[] tableCFs = convert(peerConfig.getTableCFsMap());
+ if (tableCFs != null) {
+ for (int i = 0; i < tableCFs.length; i++) {
+ builder.addTableCfs(tableCFs[i]);
+ }
+ }
+ return builder.build();
+ }
+
+ /**
+ * @param peerConfig
+ * @return Serialized protobuf of <code>peerConfig</code> with pb magic prefix prepended suitable
+ * for use as content of a this.peersZNode; i.e. the content of PEER_ID znode under
+ * /hbase/replication/peers/PEER_ID
+ */
+ public static byte[] toByteArray(final ReplicationPeerConfig peerConfig) {
+ byte[] bytes = convert(peerConfig).toByteArray();
+ return ProtobufUtil.prependPBMagic(bytes);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/7f39baf0/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java
index b8b5b22..920eea6 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
/**
* ReplicationPeer manages enabled / disabled state for the peer.
*/
http://git-wip-us.apache.org/repos/asf/hbase/blob/7f39baf0/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
index 043b38f..8d05fa0 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
@@ -18,12 +18,16 @@
package org.apache.hadoop.hbase.replication;
+import java.util.Collection;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.TreeMap;
+import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
import org.apache.hadoop.hbase.util.Bytes;
/**
@@ -37,6 +41,7 @@ public class ReplicationPeerConfig {
private String replicationEndpointImpl;
private final Map<byte[], byte[]> peerData;
private final Map<String, String> configuration;
+ private Map<TableName, ? extends Collection<String>> tableCFsMap = null;
public ReplicationPeerConfig() {
@@ -78,10 +83,19 @@ public class ReplicationPeerConfig {
return configuration;
}
+ public Map<TableName, List<String>> getTableCFsMap() {
+ return (Map<TableName, List<String>>) tableCFsMap;
+ }
+
+ public void setTableCFsMap(Map<TableName,? extends Collection<String>> tableCFsMap) {
+ this.tableCFsMap = tableCFsMap;
+ }
+
@Override
public String toString() {
StringBuilder builder = new StringBuilder("clusterKey=").append(clusterKey).append(",");
- builder.append("replicationEndpointImpl=").append(replicationEndpointImpl);
+ builder.append("replicationEndpointImpl=").append(replicationEndpointImpl).append(",")
+ .append("tableCFs=").append(tableCFsMap.toString());
return builder.toString();
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/7f39baf0/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java
index 39f6ebc..f7a2411 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java
@@ -30,11 +30,10 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
+import org.apache.hadoop.hbase.client.replication.ReplicationSerDeHelper;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
-import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
@@ -42,17 +41,18 @@ import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NodeExistsException;
@InterfaceAudience.Private
-public class ReplicationPeerZKImpl implements ReplicationPeer, Abortable, Closeable {
+public class ReplicationPeerZKImpl extends ReplicationStateZKBase
+ implements ReplicationPeer, Abortable, Closeable {
private static final Log LOG = LogFactory.getLog(ReplicationPeerZKImpl.class);
- private final ReplicationPeerConfig peerConfig;
+ private ReplicationPeerConfig peerConfig;
private final String id;
private volatile PeerState peerState;
private volatile Map<TableName, List<String>> tableCFs = new HashMap<TableName, List<String>>();
private final Configuration conf;
-
private PeerStateTracker peerStateTracker;
- private TableCFsTracker tableCFsTracker;
+ private PeerConfigTracker peerConfigTracker;
+
/**
* Constructor that takes all the objects required to communicate with the specified peer, except
@@ -61,39 +61,25 @@ public class ReplicationPeerZKImpl implements ReplicationPeer, Abortable, Closea
* @param id string representation of this peer's identifier
* @param peerConfig configuration for the replication peer
*/
- public ReplicationPeerZKImpl(Configuration conf, String id, ReplicationPeerConfig peerConfig)
+ public ReplicationPeerZKImpl(ZooKeeperWatcher zkWatcher, Configuration conf,
+ String id, ReplicationPeerConfig peerConfig,
+ Abortable abortable)
throws ReplicationException {
+ super(zkWatcher, conf, abortable);
this.conf = conf;
this.peerConfig = peerConfig;
this.id = id;
}
-
- /**
- * Constructor that takes all the objects required to communicate with the specified peer, except
- * for the region server addresses.
- * @param conf configuration object to this peer
- * @param id string representation of this peer's identifier
- * @param peerConfig configuration for the replication peer
- * @param tableCFs table-cf configuration for this peer
- */
- public ReplicationPeerZKImpl(Configuration conf, String id, ReplicationPeerConfig peerConfig,
- Map<TableName, List<String>> tableCFs) throws ReplicationException {
- this.conf = conf;
- this.peerConfig = peerConfig;
- this.id = id;
- this.tableCFs = tableCFs;
- }
/**
* start a state tracker to check whether this peer is enabled or not
*
- * @param zookeeper zk watcher for the local cluster
* @param peerStateNode path to zk node which stores peer state
* @throws KeeperException
*/
- public void startStateTracker(ZooKeeperWatcher zookeeper, String peerStateNode)
+ public void startStateTracker(String peerStateNode)
throws KeeperException {
- ensurePeerEnabled(zookeeper, peerStateNode);
+ ensurePeerEnabled(peerStateNode);
this.peerStateTracker = new PeerStateTracker(peerStateNode, zookeeper, this);
this.peerStateTracker.start();
try {
@@ -112,22 +98,26 @@ public class ReplicationPeerZKImpl implements ReplicationPeer, Abortable, Closea
/**
* start a table-cfs tracker to listen the (table, cf-list) map change
- *
- * @param zookeeper zk watcher for the local cluster
- * @param tableCFsNode path to zk node which stores table-cfs
+ * @param peerConfigNode path to zk node which stores table-cfs
* @throws KeeperException
*/
- public void startTableCFsTracker(ZooKeeperWatcher zookeeper, String tableCFsNode)
+ public void startPeerConfigTracker(String peerConfigNode)
throws KeeperException {
- this.tableCFsTracker = new TableCFsTracker(tableCFsNode, zookeeper,
+ this.peerConfigTracker = new PeerConfigTracker(peerConfigNode, zookeeper,
this);
- this.tableCFsTracker.start();
- this.readTableCFsZnode();
+ this.peerConfigTracker.start();
+ this.readPeerConfig();
}
- private void readTableCFsZnode() {
- String currentTableCFs = Bytes.toString(tableCFsTracker.getData(false));
- this.tableCFs = ReplicationAdmin.parseTableCFsFromConfig(currentTableCFs);
+ private void readPeerConfig() {
+ try {
+ byte[] data = peerConfigTracker.getData(false);
+ if (data != null) {
+ this.peerConfig = ReplicationSerDeHelper.parsePeerFrom(data);
+ }
+ } catch (DeserializationException e) {
+ LOG.error("", e);
+ }
}
@Override
@@ -168,6 +158,7 @@ public class ReplicationPeerZKImpl implements ReplicationPeer, Abortable, Closea
*/
@Override
public Map<TableName, List<String>> getTableCFs() {
+ this.tableCFs = peerConfig.getTableCFsMap();
return this.tableCFs;
}
@@ -223,13 +214,12 @@ public class ReplicationPeerZKImpl implements ReplicationPeer, Abortable, Closea
/**
* Utility method to ensure an ENABLED znode is in place; if not present, we create it.
- * @param zookeeper
* @param path Path to znode to check
* @return True if we created the znode.
* @throws NodeExistsException
* @throws KeeperException
*/
- private static boolean ensurePeerEnabled(final ZooKeeperWatcher zookeeper, final String path)
+ private boolean ensurePeerEnabled(final String path)
throws NodeExistsException, KeeperException {
if (ZKUtil.checkExists(zookeeper, path) == -1) {
// There is a race b/w PeerWatcher and ReplicationZookeeper#add method to create the
@@ -266,20 +256,20 @@ public class ReplicationPeerZKImpl implements ReplicationPeer, Abortable, Closea
}
/**
- * Tracker for (table, cf-list) map of this peer
+ * Tracker for PeerConfigNode of this peer
*/
- public class TableCFsTracker extends ZooKeeperNodeTracker {
+ public class PeerConfigTracker extends ZooKeeperNodeTracker {
- public TableCFsTracker(String tableCFsZNode, ZooKeeperWatcher watcher,
+ public PeerConfigTracker(String peerConfigNode, ZooKeeperWatcher watcher,
Abortable abortable) {
- super(watcher, tableCFsZNode, abortable);
+ super(watcher, peerConfigNode, abortable);
}
@Override
public synchronized void nodeCreated(String path) {
if (path.equals(node)) {
super.nodeCreated(path);
- readTableCFsZnode();
+ readPeerConfig();
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/7f39baf0/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
index 8bf21d5..1961a65 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
@@ -18,6 +18,7 @@
*/
package org.apache.hadoop.hbase.replication;
+import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -49,10 +50,8 @@ public interface ReplicationPeers {
* Add a new remote slave cluster for replication.
* @param peerId a short that identifies the cluster
* @param peerConfig configuration for the replication slave cluster
- * @param tableCFs the table and column-family list which will be replicated for this peer or null
- * for all table and column families
*/
- void addPeer(String peerId, ReplicationPeerConfig peerConfig, String tableCFs)
+ void addPeer(String peerId, ReplicationPeerConfig peerConfig)
throws ReplicationException;
/**
@@ -81,21 +80,17 @@ public interface ReplicationPeers {
* Get the table and column-family list string of the peer from ZK.
* @param peerId a short that identifies the cluster
*/
- public String getPeerTableCFsConfig(String peerId) throws ReplicationException;
+ public Map<TableName, List<String>> getPeerTableCFsConfig(String peerId)
+ throws ReplicationException;
/**
* Set the table and column-family list string of the peer to ZK.
* @param peerId a short that identifies the cluster
* @param tableCFs the table and column-family list which will be replicated for this peer
*/
- public void setPeerTableCFsConfig(String peerId, String tableCFs) throws ReplicationException;
-
- /**
- * Get the table and column-family-list map of the peer.
- * @param peerId a short that identifies the cluster
- * @return the table and column-family list which will be replicated for this peer
- */
- public Map<TableName, List<String>> getTableCFs(String peerId);
+ public void setPeerTableCFsConfig(String peerId,
+ Map<TableName, ? extends Collection<String>> tableCFs)
+ throws ReplicationException;
/**
* Returns the ReplicationPeer
http://git-wip-us.apache.org/repos/asf/hbase/blob/7f39baf0/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
index e14f2c6..367c688 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.replication;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -29,19 +30,16 @@ import java.util.concurrent.ConcurrentMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.CompoundConfiguration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.replication.ReplicationSerDeHelper;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
-import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BytesBytesPair;
-import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
-import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp;
@@ -49,8 +47,6 @@ import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NoNodeException;
-import com.google.protobuf.ByteString;
-
/**
* This class provides an implementation of the ReplicationPeers interface using Zookeeper. The
* peers znode contains a list of all peer replication clusters and the current replication state of
@@ -82,15 +78,15 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
// Map of peer clusters keyed by their id
private Map<String, ReplicationPeerZKImpl> peerClusters;
- private final String tableCFsNodeName;
private final ReplicationQueuesClient queuesClient;
+ private Abortable abortable;
private static final Log LOG = LogFactory.getLog(ReplicationPeersZKImpl.class);
public ReplicationPeersZKImpl(final ZooKeeperWatcher zk, final Configuration conf,
final ReplicationQueuesClient queuesClient, Abortable abortable) {
super(zk, conf, abortable);
- this.tableCFsNodeName = conf.get("zookeeper.znode.replication.peers.tableCFs", "tableCFs");
+ this.abortable = abortable;
this.peerClusters = new ConcurrentHashMap<String, ReplicationPeerZKImpl>();
this.queuesClient = queuesClient;
}
@@ -108,7 +104,7 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
}
@Override
- public void addPeer(String id, ReplicationPeerConfig peerConfig, String tableCFs)
+ public void addPeer(String id, ReplicationPeerConfig peerConfig)
throws ReplicationException {
try {
if (peerExists(id)) {
@@ -136,18 +132,15 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
}
List<ZKUtilOp> listOfOps = new ArrayList<ZKUtil.ZKUtilOp>();
- ZKUtilOp op1 = ZKUtilOp.createAndFailSilent(ZKUtil.joinZNode(this.peersZNode, id),
- toByteArray(peerConfig));
+ ZKUtilOp op1 = ZKUtilOp.createAndFailSilent(getPeerNode(id),
+ ReplicationSerDeHelper.toByteArray(peerConfig));
// There is a race (if hbase.zookeeper.useMulti is false)
// b/w PeerWatcher and ReplicationZookeeper#add method to create the
// peer-state znode. This happens while adding a peer
// The peer state data is set as "ENABLED" by default.
ZKUtilOp op2 = ZKUtilOp.createAndFailSilent(getPeerStateNode(id), ENABLED_ZNODE_BYTES);
- String tableCFsStr = (tableCFs == null) ? "" : tableCFs;
- ZKUtilOp op3 = ZKUtilOp.createAndFailSilent(getTableCFsNode(id), Bytes.toBytes(tableCFsStr));
listOfOps.add(op1);
listOfOps.add(op2);
- listOfOps.add(op3);
ZKUtil.multiOrSequential(this.zookeeper, listOfOps, false);
// A peer is enabled by default
} catch (KeeperException e) {
@@ -192,13 +185,17 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
}
@Override
- public String getPeerTableCFsConfig(String id) throws ReplicationException {
+ public Map<TableName, List<String>> getPeerTableCFsConfig(String id) throws ReplicationException {
try {
if (!peerExists(id)) {
throw new IllegalArgumentException("peer " + id + " doesn't exist");
}
try {
- return Bytes.toString(ZKUtil.getData(this.zookeeper, getTableCFsNode(id)));
+ ReplicationPeerConfig rpc = getReplicationPeerConfig(id);
+ if (rpc == null) {
+ throw new ReplicationException("Unable to get tableCFs of the peer with id=" + id);
+ }
+ return rpc.getTableCFsMap();
} catch (Exception e) {
throw new ReplicationException(e);
}
@@ -208,35 +205,29 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
}
@Override
- public void setPeerTableCFsConfig(String id, String tableCFsStr) throws ReplicationException {
+ public void setPeerTableCFsConfig(String id,
+ Map<TableName, ? extends Collection<String>> tableCFs)
+ throws ReplicationException {
try {
if (!peerExists(id)) {
throw new IllegalArgumentException("Cannot set peer tableCFs because id=" + id
+ " does not exist.");
}
- String tableCFsZKNode = getTableCFsNode(id);
- byte[] tableCFs = Bytes.toBytes(tableCFsStr);
- if (ZKUtil.checkExists(this.zookeeper, tableCFsZKNode) != -1) {
- ZKUtil.setData(this.zookeeper, tableCFsZKNode, tableCFs);
- } else {
- ZKUtil.createAndWatch(this.zookeeper, tableCFsZKNode, tableCFs);
+ ReplicationPeerConfig rpc = getReplicationPeerConfig(id);
+ if (rpc == null) {
+ throw new ReplicationException("Unable to get tableCFs of the peer with id=" + id);
}
- LOG.info("Peer tableCFs with id= " + id + " is now " + tableCFsStr);
+ rpc.setTableCFsMap(tableCFs);
+ ZKUtil.setData(this.zookeeper, getPeerNode(id),
+ ReplicationSerDeHelper.toByteArray(rpc));
+ LOG.info("Peer tableCFs with id= " + id + " is now " +
+ ReplicationSerDeHelper.convertToString(tableCFs));
} catch (KeeperException e) {
throw new ReplicationException("Unable to change tableCFs of the peer with id=" + id, e);
}
}
@Override
- public Map<TableName, List<String>> getTableCFs(String id) throws IllegalArgumentException {
- ReplicationPeer replicationPeer = this.peerClusters.get(id);
- if (replicationPeer == null) {
- throw new IllegalArgumentException("Peer with id= " + id + " is not connected");
- }
- return replicationPeer.getTableCFs();
- }
-
- @Override
public boolean getStatusOfPeer(String id) {
ReplicationPeer replicationPeer = this.peerClusters.get(id);
if (replicationPeer == null) {
@@ -306,7 +297,7 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
@Override
public ReplicationPeerConfig getReplicationPeerConfig(String peerId)
throws ReplicationException {
- String znode = ZKUtil.joinZNode(this.peersZNode, peerId);
+ String znode = getPeerNode(peerId);
byte[] data = null;
try {
data = ZKUtil.getData(this.zookeeper, znode);
@@ -325,7 +316,7 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
}
try {
- return parsePeerFrom(data);
+ return ReplicationSerDeHelper.parsePeerFrom(data);
} catch (DeserializationException e) {
LOG.warn("Failed to parse cluster key from peerId=" + peerId
+ ", specifically the content from the following znode: " + znode);
@@ -438,14 +429,6 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
return true;
}
- private String getTableCFsNode(String id) {
- return ZKUtil.joinZNode(this.peersZNode, ZKUtil.joinZNode(id, this.tableCFsNodeName));
- }
-
- private String getPeerStateNode(String id) {
- return ZKUtil.joinZNode(this.peersZNode, ZKUtil.joinZNode(id, this.peerStateNodeName));
- }
-
/**
* Update the state znode of a peer cluster.
* @param id
@@ -486,16 +469,17 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
}
Configuration peerConf = pair.getSecond();
- ReplicationPeerZKImpl peer = new ReplicationPeerZKImpl(peerConf, peerId, pair.getFirst());
+ ReplicationPeerZKImpl peer = new ReplicationPeerZKImpl(zookeeper,
+ peerConf, peerId, pair.getFirst(), abortable);
try {
- peer.startStateTracker(this.zookeeper, this.getPeerStateNode(peerId));
+ peer.startStateTracker(this.getPeerStateNode(peerId));
} catch (KeeperException e) {
throw new ReplicationException("Error starting the peer state tracker for peerId=" +
peerId, e);
}
try {
- peer.startTableCFsTracker(this.zookeeper, this.getTableCFsNode(peerId));
+ peer.startPeerConfigTracker(this.getPeerNode(peerId));
} catch (KeeperException e) {
throw new ReplicationException("Error starting the peer tableCFs tracker for peerId=" +
peerId, e);
@@ -504,89 +488,6 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
return peer;
}
- /**
- * @param bytes Content of a peer znode.
- * @return ClusterKey parsed from the passed bytes.
- * @throws DeserializationException
- */
- private static ReplicationPeerConfig parsePeerFrom(final byte[] bytes)
- throws DeserializationException {
- if (ProtobufUtil.isPBMagicPrefix(bytes)) {
- int pblen = ProtobufUtil.lengthOfPBMagic();
- ZooKeeperProtos.ReplicationPeer.Builder builder =
- ZooKeeperProtos.ReplicationPeer.newBuilder();
- ZooKeeperProtos.ReplicationPeer peer;
- try {
- ProtobufUtil.mergeFrom(builder, bytes, pblen, bytes.length - pblen);
- peer = builder.build();
- } catch (IOException e) {
- throw new DeserializationException(e);
- }
- return convert(peer);
- } else {
- if (bytes.length > 0) {
- return new ReplicationPeerConfig().setClusterKey(Bytes.toString(bytes));
- }
- return new ReplicationPeerConfig().setClusterKey("");
- }
- }
-
- private static ReplicationPeerConfig convert(ZooKeeperProtos.ReplicationPeer peer) {
- ReplicationPeerConfig peerConfig = new ReplicationPeerConfig();
- if (peer.hasClusterkey()) {
- peerConfig.setClusterKey(peer.getClusterkey());
- }
- if (peer.hasReplicationEndpointImpl()) {
- peerConfig.setReplicationEndpointImpl(peer.getReplicationEndpointImpl());
- }
-
- for (BytesBytesPair pair : peer.getDataList()) {
- peerConfig.getPeerData().put(pair.getFirst().toByteArray(), pair.getSecond().toByteArray());
- }
-
- for (NameStringPair pair : peer.getConfigurationList()) {
- peerConfig.getConfiguration().put(pair.getName(), pair.getValue());
- }
- return peerConfig;
- }
-
- private static ZooKeeperProtos.ReplicationPeer convert(ReplicationPeerConfig peerConfig) {
- ZooKeeperProtos.ReplicationPeer.Builder builder = ZooKeeperProtos.ReplicationPeer.newBuilder();
- if (peerConfig.getClusterKey() != null) {
- builder.setClusterkey(peerConfig.getClusterKey());
- }
- if (peerConfig.getReplicationEndpointImpl() != null) {
- builder.setReplicationEndpointImpl(peerConfig.getReplicationEndpointImpl());
- }
-
- for (Map.Entry<byte[], byte[]> entry : peerConfig.getPeerData().entrySet()) {
- builder.addData(BytesBytesPair.newBuilder()
- .setFirst(ByteString.copyFrom(entry.getKey()))
- .setSecond(ByteString.copyFrom(entry.getValue()))
- .build());
- }
-
- for (Map.Entry<String, String> entry : peerConfig.getConfiguration().entrySet()) {
- builder.addConfiguration(NameStringPair.newBuilder()
- .setName(entry.getKey())
- .setValue(entry.getValue())
- .build());
- }
-
- return builder.build();
- }
-
- /**
- * @param peerConfig
- * @return Serialized protobuf of <code>peerConfig</code> with pb magic prefix prepended suitable
- * for use as content of a this.peersZNode; i.e. the content of PEER_ID znode under
- * /hbase/replication/peers/PEER_ID
- */
- private static byte[] toByteArray(final ReplicationPeerConfig peerConfig) {
- byte[] bytes = convert(peerConfig).toByteArray();
- return ProtobufUtil.prependPBMagic(bytes);
- }
-
private void checkQueuesDeleted(String peerId) throws ReplicationException {
if (queuesClient == null) return;
try {
http://git-wip-us.apache.org/repos/asf/hbase/blob/7f39baf0/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java
index a1dc1c8..79853a8 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.replication;
import java.util.List;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
@@ -52,6 +53,9 @@ public abstract class ReplicationStateZKBase {
protected final String hfileRefsZNode;
/** The cluster key of the local cluster */
protected final String ourClusterKey;
+ /** The name of the znode that contains tableCFs */
+ protected final String tableCFsNodeName;
+
protected final ZooKeeperWatcher zookeeper;
protected final Configuration conf;
protected final Abortable abortable;
@@ -77,6 +81,7 @@ public abstract class ReplicationStateZKBase {
String hfileRefsZNodeName = conf.get(ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_KEY,
ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_DEFAULT);
this.peerStateNodeName = conf.get("zookeeper.znode.replication.peers.state", "peer-state");
+ this.tableCFsNodeName = conf.get("zookeeper.znode.replication.peers.tableCFs", "tableCFs");
this.ourClusterKey = ZKConfig.getZooKeeperClusterKey(this.conf);
this.replicationZNode = ZKUtil.joinZNode(this.zookeeper.baseZNode, replicationZNodeName);
this.peersZNode = ZKUtil.joinZNode(replicationZNode, peersZNodeName);
@@ -118,4 +123,18 @@ public abstract class ReplicationStateZKBase {
protected boolean isPeerPath(String path) {
return path.split("/").length == peersZNode.split("/").length + 1;
}
+
+ @VisibleForTesting
+ protected String getTableCFsNode(String id) {
+ return ZKUtil.joinZNode(this.peersZNode, ZKUtil.joinZNode(id, this.tableCFsNodeName));
+ }
+
+ @VisibleForTesting
+ protected String getPeerStateNode(String id) {
+ return ZKUtil.joinZNode(this.peersZNode, ZKUtil.joinZNode(id, this.peerStateNodeName));
+ }
+ @VisibleForTesting
+ protected String getPeerNode(String id) {
+ return ZKUtil.joinZNode(this.peersZNode, id);
+ }
}