You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ga...@apache.org on 2015/12/10 00:55:58 UTC
hbase git commit: HBASE-14866 VerifyReplication and ReplicationAdmin
should use full peer configuration for peer connection
Repository: hbase
Updated Branches:
refs/heads/master ba3aa9a9b -> c1e0fcc26
HBASE-14866 VerifyReplication and ReplicationAdmin should use full peer configuration for peer connection
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/c1e0fcc2
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/c1e0fcc2
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/c1e0fcc2
Branch: refs/heads/master
Commit: c1e0fcc26d7e7b10f6ce609e1ff0e4e9378dcf4b
Parents: ba3aa9a
Author: Gary Helmling <ga...@apache.org>
Authored: Wed Dec 9 15:52:27 2015 -0800
Committer: Gary Helmling <ga...@apache.org>
Committed: Wed Dec 9 15:52:27 2015 -0800
----------------------------------------------------------------------
.../client/replication/ReplicationAdmin.java | 14 +-
.../replication/ReplicationPeersZKImpl.java | 7 +-
.../replication/ReplicationStateZKBase.java | 3 +-
.../apache/hadoop/hbase/zookeeper/ZKConfig.java | 155 ----------
.../apache/hadoop/hbase/zookeeper/ZKUtil.java | 124 --------
.../hadoop/hbase/zookeeper/TestZKUtil.java | 11 -
.../apache/hadoop/hbase/HBaseConfiguration.java | 78 ++++-
.../apache/hadoop/hbase/zookeeper/ZKConfig.java | 301 +++++++++++++++++++
.../hadoop/hbase/TestHBaseConfiguration.java | 10 +-
.../hadoop/hbase/zookeeper/TestZKConfig.java | 126 ++++++++
.../hadoop/hbase/mapreduce/SyncTable.java | 15 +-
.../hbase/mapreduce/TableMapReduceUtil.java | 34 ++-
.../hbase/mapreduce/TableOutputFormat.java | 22 +-
.../replication/VerifyReplication.java | 25 +-
.../hbase/util/ServerRegionReplicaUtil.java | 4 +-
.../org/apache/hadoop/hbase/TestZooKeeper.java | 65 ----
.../replication/TestReplicationAdmin.java | 36 ++-
.../replication/TestReplicationEndpoint.java | 10 +-
.../replication/TestReplicationStateBasic.java | 4 +-
.../replication/TestReplicationStateZKImpl.java | 5 +-
.../TestRegionReplicaReplicationEndpoint.java | 8 +-
.../hadoop/hbase/zookeeper/TestZKConfig.java | 45 ---
22 files changed, 620 insertions(+), 482 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/c1e0fcc2/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
index 8bd1267..a0bea8b 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
@@ -53,7 +53,6 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerZKImpl;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat;
@@ -626,7 +625,8 @@ public class ReplicationAdmin implements Closeable {
}
}
- private List<ReplicationPeer> listValidReplicationPeers() {
+ @VisibleForTesting
+ List<ReplicationPeer> listValidReplicationPeers() {
Map<String, ReplicationPeerConfig> peers = listPeerConfigs();
if (peers == null || peers.size() <= 0) {
return null;
@@ -634,18 +634,16 @@ public class ReplicationAdmin implements Closeable {
List<ReplicationPeer> validPeers = new ArrayList<ReplicationPeer>(peers.size());
for (Entry<String, ReplicationPeerConfig> peerEntry : peers.entrySet()) {
String peerId = peerEntry.getKey();
- String clusterKey = peerEntry.getValue().getClusterKey();
- Configuration peerConf = new Configuration(this.connection.getConfiguration());
Stat s = null;
try {
- ZKUtil.applyClusterKeyToConf(peerConf, clusterKey);
Pair<ReplicationPeerConfig, Configuration> pair = this.replicationPeers.getPeerConf(peerId);
+ Configuration peerConf = pair.getSecond();
ReplicationPeer peer = new ReplicationPeerZKImpl(peerConf, peerId, pair.getFirst());
s =
zkw.getRecoverableZooKeeper().exists(peerConf.get(HConstants.ZOOKEEPER_ZNODE_PARENT),
null);
if (null == s) {
- LOG.info(peerId + ' ' + clusterKey + " is invalid now.");
+ LOG.info(peerId + ' ' + pair.getFirst().getClusterKey() + " is invalid now.");
continue;
}
validPeers.add(peer);
@@ -664,10 +662,6 @@ public class ReplicationAdmin implements Closeable {
LOG.debug("Failure details to get valid replication peers.", e);
Thread.currentThread().interrupt();
continue;
- } catch (IOException e) {
- LOG.warn("Failed to get valid replication peers due to IOException.");
- LOG.debug("Failure details to get valid replication peers.", e);
- continue;
}
}
return validPeers;
http://git-wip-us.apache.org/repos/asf/hbase/blob/c1e0fcc2/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
index 1884469..63f9ac3 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
@@ -29,6 +29,7 @@ import java.util.concurrent.ConcurrentMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
@@ -318,11 +319,9 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
return null;
}
- Configuration otherConf = new Configuration(this.conf);
+ Configuration otherConf;
try {
- if (peerConfig.getClusterKey() != null && !peerConfig.getClusterKey().isEmpty()) {
- ZKUtil.applyClusterKeyToConf(otherConf, peerConfig.getClusterKey());
- }
+ otherConf = HBaseConfiguration.createClusterConf(this.conf, peerConfig.getClusterKey());
} catch (IOException e) {
LOG.error("Can't get peer configuration for peerId=" + peerId + " because:", e);
return null;
http://git-wip-us.apache.org/repos/asf/hbase/blob/c1e0fcc2/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java
index 1691b3f..4fbac0f 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
+import org.apache.hadoop.hbase.zookeeper.ZKConfig;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException;
@@ -69,7 +70,7 @@ public abstract class ReplicationStateZKBase {
String peersZNodeName = conf.get("zookeeper.znode.replication.peers", "peers");
String queuesZNodeName = conf.get("zookeeper.znode.replication.rs", "rs");
this.peerStateNodeName = conf.get("zookeeper.znode.replication.peers.state", "peer-state");
- this.ourClusterKey = ZKUtil.getZooKeeperClusterKey(this.conf);
+ this.ourClusterKey = ZKConfig.getZooKeeperClusterKey(this.conf);
this.replicationZNode = ZKUtil.joinZNode(this.zookeeper.baseZNode, replicationZNodeName);
this.peersZNode = ZKUtil.joinZNode(replicationZNode, peersZNodeName);
this.queuesZNode = ZKUtil.joinZNode(replicationZNode, queuesZNodeName);
http://git-wip-us.apache.org/repos/asf/hbase/blob/c1e0fcc2/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKConfig.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKConfig.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKConfig.java
deleted file mode 100644
index a8f1182..0000000
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKConfig.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.zookeeper;
-
-import java.util.Map.Entry;
-import java.util.Properties;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-
-/**
- * Utility methods for reading, and building the ZooKeeper configuration.
- *
- * The order and priority for reading the config are as follows:
- * (1). Property with "hbase.zookeeper.property." prefix from HBase XML
- * (2). other zookeeper related properties in HBASE XML
- */
-@InterfaceAudience.Private
-public class ZKConfig {
-
- private static final String VARIABLE_START = "${";
-
- /**
- * Make a Properties object holding ZooKeeper config.
- * Parses the corresponding config options from the HBase XML configs
- * and generates the appropriate ZooKeeper properties.
- * @param conf Configuration to read from.
- * @return Properties holding mappings representing ZooKeeper config file.
- */
- public static Properties makeZKProps(Configuration conf) {
- return makeZKPropsFromHbaseConfig(conf);
- }
-
- /**
- * Make a Properties object holding ZooKeeper config.
- * Parses the corresponding config options from the HBase XML configs
- * and generates the appropriate ZooKeeper properties.
- *
- * @param conf Configuration to read from.
- * @return Properties holding mappings representing ZooKeeper config file.
- */
- private static Properties makeZKPropsFromHbaseConfig(Configuration conf) {
- Properties zkProperties = new Properties();
-
- // Directly map all of the hbase.zookeeper.property.KEY properties.
- // Synchronize on conf so no loading of configs while we iterate
- synchronized (conf) {
- for (Entry<String, String> entry : conf) {
- String key = entry.getKey();
- if (key.startsWith(HConstants.ZK_CFG_PROPERTY_PREFIX)) {
- String zkKey = key.substring(HConstants.ZK_CFG_PROPERTY_PREFIX_LEN);
- String value = entry.getValue();
- // If the value has variables substitutions, need to do a get.
- if (value.contains(VARIABLE_START)) {
- value = conf.get(key);
- }
- zkProperties.setProperty(zkKey, value);
- }
- }
- }
-
- // If clientPort is not set, assign the default.
- if (zkProperties.getProperty(HConstants.CLIENT_PORT_STR) == null) {
- zkProperties.put(HConstants.CLIENT_PORT_STR,
- HConstants.DEFAULT_ZOOKEPER_CLIENT_PORT);
- }
-
- // Create the server.X properties.
- int peerPort = conf.getInt("hbase.zookeeper.peerport", 2888);
- int leaderPort = conf.getInt("hbase.zookeeper.leaderport", 3888);
-
- final String[] serverHosts = conf.getStrings(HConstants.ZOOKEEPER_QUORUM,
- HConstants.LOCALHOST);
- String serverHost;
- String address;
- String key;
- for (int i = 0; i < serverHosts.length; ++i) {
- if (serverHosts[i].contains(":")) {
- serverHost = serverHosts[i].substring(0, serverHosts[i].indexOf(':'));
- } else {
- serverHost = serverHosts[i];
- }
- address = serverHost + ":" + peerPort + ":" + leaderPort;
- key = "server." + i;
- zkProperties.put(key, address);
- }
-
- return zkProperties;
- }
-
- /**
- * Return the ZK Quorum servers string given the specified configuration
- *
- * @param conf
- * @return Quorum servers String
- */
- private static String getZKQuorumServersStringFromHbaseConfig(Configuration conf) {
- String defaultClientPort = Integer.toString(
- conf.getInt(HConstants.ZOOKEEPER_CLIENT_PORT, HConstants.DEFAULT_ZOOKEPER_CLIENT_PORT));
-
- // Build the ZK quorum server string with "server:clientport" list, separated by ','
- final String[] serverHosts =
- conf.getStrings(HConstants.ZOOKEEPER_QUORUM, HConstants.LOCALHOST);
- return buildQuorumServerString(serverHosts, defaultClientPort);
- }
-
- /**
- * Build the ZK quorum server string with "server:clientport" list, separated by ','
- *
- * @param serverHosts a list of servers for ZK quorum
- * @param clientPort the default client port
- * @return the string for a list of "server:port" separated by ","
- */
- public static String buildQuorumServerString(String[] serverHosts, String clientPort) {
- StringBuilder quorumStringBuilder = new StringBuilder();
- String serverHost;
- for (int i = 0; i < serverHosts.length; ++i) {
- if (serverHosts[i].contains(":")) {
- serverHost = serverHosts[i]; // just use the port specified from the input
- } else {
- serverHost = serverHosts[i] + ":" + clientPort;
- }
- if (i > 0) {
- quorumStringBuilder.append(',');
- }
- quorumStringBuilder.append(serverHost);
- }
- return quorumStringBuilder.toString();
- }
-
- /**
- * Return the ZK Quorum servers string given the specified configuration.
- * @return Quorum servers
- */
- public static String getZKQuorumServersString(Configuration conf) {
- return getZKQuorumServersStringFromHbaseConfig(conf);
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c1e0fcc2/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
index 633525f..c268268 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
@@ -76,7 +76,6 @@ import org.apache.zookeeper.proto.DeleteRequest;
import org.apache.zookeeper.proto.SetDataRequest;
import org.apache.zookeeper.server.ZooKeeperSaslServer;
-import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.InvalidProtocolBufferException;
/**
@@ -96,25 +95,6 @@ public class ZKUtil {
public static final char ZNODE_PATH_SEPARATOR = '/';
private static int zkDumpConnectionTimeOut;
- // The Quorum for the ZK cluster can have one the following format (see examples below):
- // (1). s1,s2,s3 (no client port in the list, the client port could be obtained from clientPort)
- // (2). s1:p1,s2:p2,s3:p3 (with client port, which could be same or different for each server,
- // in this case, the clientPort would be ignored)
- // (3). s1:p1,s2,s3:p3 (mix of (1) and (2) - if port is not specified in a server, it would use
- // the clientPort; otherwise, it would use the specified port)
- @VisibleForTesting
- public static class ZKClusterKey {
- public String quorumString;
- public int clientPort;
- public String znodeParent;
-
- ZKClusterKey(String quorumString, int clientPort, String znodeParent) {
- this.quorumString = quorumString;
- this.clientPort = clientPort;
- this.znodeParent = znodeParent;
- }
- }
-
/**
* Creates a new connection to ZooKeeper, pulling settings and ensemble config
* from the specified configuration object using methods from {@link ZKConfig}.
@@ -361,110 +341,6 @@ public class ZKUtil {
return path.substring(path.lastIndexOf("/")+1);
}
- /**
- * Get the key to the ZK ensemble for this configuration without
- * adding a name at the end
- * @param conf Configuration to use to build the key
- * @return ensemble key without a name
- */
- public static String getZooKeeperClusterKey(Configuration conf) {
- return getZooKeeperClusterKey(conf, null);
- }
-
- /**
- * Get the key to the ZK ensemble for this configuration and append
- * a name at the end
- * @param conf Configuration to use to build the key
- * @param name Name that should be appended at the end if not empty or null
- * @return ensemble key with a name (if any)
- */
- public static String getZooKeeperClusterKey(Configuration conf, String name) {
- String ensemble = conf.get(HConstants.ZOOKEEPER_QUORUM).replaceAll(
- "[\\t\\n\\x0B\\f\\r]", "");
- StringBuilder builder = new StringBuilder(ensemble);
- builder.append(":");
- builder.append(conf.get(HConstants.ZOOKEEPER_CLIENT_PORT));
- builder.append(":");
- builder.append(conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT));
- if (name != null && !name.isEmpty()) {
- builder.append(",");
- builder.append(name);
- }
- return builder.toString();
- }
-
- /**
- * Apply the settings in the given key to the given configuration, this is
- * used to communicate with distant clusters
- * @param conf configuration object to configure
- * @param key string that contains the 3 required configuratins
- * @throws IOException
- */
- public static void applyClusterKeyToConf(Configuration conf, String key)
- throws IOException{
- ZKClusterKey zkClusterKey = transformClusterKey(key);
- conf.set(HConstants.ZOOKEEPER_QUORUM, zkClusterKey.quorumString);
- conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, zkClusterKey.clientPort);
- conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, zkClusterKey.znodeParent);
- }
-
- /**
- * Separate the given key into the three configurations it should contain:
- * hbase.zookeeper.quorum, hbase.zookeeper.client.port
- * and zookeeper.znode.parent
- * @param key
- * @return the three configuration in the described order
- * @throws IOException
- */
- public static ZKClusterKey transformClusterKey(String key) throws IOException {
- String[] parts = key.split(":");
-
- if (parts.length == 3) {
- return new ZKClusterKey(parts [0], Integer.parseInt(parts [1]), parts [2]);
- }
-
- if (parts.length > 3) {
- // The quorum could contain client port in server:clientport format, try to transform more.
- String zNodeParent = parts [parts.length - 1];
- String clientPort = parts [parts.length - 2];
-
- // The first part length is the total length minus the lengths of other parts and minus 2 ":"
- int endQuorumIndex = key.length() - zNodeParent.length() - clientPort.length() - 2;
- String quorumStringInput = key.substring(0, endQuorumIndex);
- String[] serverHosts = quorumStringInput.split(",");
-
- // The common case is that every server has its own client port specified - this means
- // that (total parts - the ZNodeParent part - the ClientPort part) is equal to
- // (the number of "," + 1) - "+ 1" because the last server has no ",".
- if ((parts.length - 2) == (serverHosts.length + 1)) {
- return new ZKClusterKey(quorumStringInput, Integer.parseInt(clientPort), zNodeParent);
- }
-
- // For the uncommon case that some servers has no port specified, we need to build the
- // server:clientport list using default client port for servers without specified port.
- return new ZKClusterKey(
- ZKConfig.buildQuorumServerString(serverHosts, clientPort),
- Integer.parseInt(clientPort),
- zNodeParent);
- }
-
- throw new IOException("Cluster key passed " + key + " is invalid, the format should be:" +
- HConstants.ZOOKEEPER_QUORUM + ":" + HConstants.ZOOKEEPER_CLIENT_PORT + ":"
- + HConstants.ZOOKEEPER_ZNODE_PARENT);
- }
-
- /**
- * Standardize the ZK quorum string: make it a "server:clientport" list, separated by ','
- * @param quorumStringInput a string contains a list of servers for ZK quorum
- * @param clientPort the default client port
- * @return the string for a list of "server:port" separated by ","
- */
- @VisibleForTesting
- public static String standardizeQuorumServerString(String quorumStringInput, String clientPort) {
- String[] serverHosts = quorumStringInput.split(",");
- return ZKConfig.buildQuorumServerString(serverHosts, clientPort);
- }
-
//
// Existence checks and watches
//
http://git-wip-us.apache.org/repos/asf/hbase/blob/c1e0fcc2/hbase-client/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKUtil.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKUtil.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKUtil.java
index 72de935..eb629f2 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKUtil.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKUtil.java
@@ -41,17 +41,6 @@ import org.junit.experimental.categories.Category;
public class TestZKUtil {
@Test
- public void testGetZooKeeperClusterKey() {
- Configuration conf = HBaseConfiguration.create();
- conf.set(HConstants.ZOOKEEPER_QUORUM, "\tlocalhost\n");
- conf.set(HConstants.ZOOKEEPER_CLIENT_PORT, "3333");
- conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "hbase");
- String clusterKey = ZKUtil.getZooKeeperClusterKey(conf, "test");
- Assert.assertTrue(!clusterKey.contains("\t") && !clusterKey.contains("\n"));
- Assert.assertEquals("localhost:3333:hbase,test", clusterKey);
- }
-
- @Test
public void testCreateACL() throws ZooKeeperConnectionException, IOException {
Configuration conf = HBaseConfiguration.create();
conf.set(Superusers.SUPERUSER_CONF_KEY, "user1,@group1,user2,@group2,user3");
http://git-wip-us.apache.org/repos/asf/hbase/blob/c1e0fcc2/hbase-common/src/main/java/org/apache/hadoop/hbase/HBaseConfiguration.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HBaseConfiguration.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HBaseConfiguration.java
index 7a037f4..7b94c3d 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HBaseConfiguration.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HBaseConfiguration.java
@@ -20,7 +20,7 @@ package org.apache.hadoop.hbase;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
-import java.util.Map.Entry;
+import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.io.util.HeapMemorySizeUtil;
import org.apache.hadoop.hbase.util.VersionInfo;
+import org.apache.hadoop.hbase.zookeeper.ZKConfig;
/**
* Adds HBase configuration files to a Configuration
@@ -115,7 +116,7 @@ public class HBaseConfiguration extends Configuration {
* @param srcConf the source configuration
**/
public static void merge(Configuration destConf, Configuration srcConf) {
- for (Entry<String, String> e : srcConf) {
+ for (Map.Entry<String, String> e : srcConf) {
destConf.set(e.getKey(), e.getValue());
}
}
@@ -129,7 +130,7 @@ public class HBaseConfiguration extends Configuration {
*/
public static Configuration subset(Configuration srcConf, String prefix) {
Configuration newConf = new Configuration(false);
- for (Entry<String, String> entry : srcConf) {
+ for (Map.Entry<String, String> entry : srcConf) {
if (entry.getKey().startsWith(prefix)) {
String newKey = entry.getKey().substring(prefix.length());
// avoid entries that would produce an empty key
@@ -142,6 +143,18 @@ public class HBaseConfiguration extends Configuration {
}
/**
+ * Sets all the entries in the provided {@code Map<String, String>} as properties in the
+ * given {@code Configuration}. Each property will have the specified prefix prepended,
+ * so that the configuration entries are keyed by {@code prefix + entry.getKey()}.
+ */
+ public static void setWithPrefix(Configuration conf, String prefix,
+ Iterable<Map.Entry<String, String>> properties) {
+ for (Map.Entry<String, String> entry : properties) {
+ conf.set(prefix + entry.getKey(), entry.getValue());
+ }
+ }
+
+ /**
* @return whether to show HBase Configuration in servlet
*/
public static boolean isShowConfInServlet() {
@@ -236,6 +249,65 @@ public class HBaseConfiguration extends Configuration {
}
/**
+ * Generates a {@link Configuration} instance by applying the ZooKeeper cluster key
+ * to the base Configuration. Note that additional configuration properties may be needed
+ * for a remote cluster, so it is preferable to use
+ * {@link #createClusterConf(Configuration, String, String)}.
+ *
+ * @param baseConf the base configuration to use, containing prefixed override properties
+ * @param clusterKey the ZooKeeper quorum cluster key to apply, or {@code null} if none
+ *
+ * @return the merged configuration with override properties and cluster key applied
+ *
+ * @see #createClusterConf(Configuration, String, String)
+ */
+ public static Configuration createClusterConf(Configuration baseConf, String clusterKey)
+ throws IOException {
+ return createClusterConf(baseConf, clusterKey, null);
+ }
+
+ /**
+ * Generates a {@link Configuration} instance by applying property overrides prefixed by
+ * a cluster profile key to the base Configuration. Override properties are extracted by
+ * the {@link #subset(Configuration, String)} method, then the merged on top of the base
+ * Configuration and returned.
+ *
+ * @param baseConf the base configuration to use, containing prefixed override properties
+ * @param clusterKey the ZooKeeper quorum cluster key to apply, or {@code null} if none
+ * @param overridePrefix the property key prefix to match for override properties,
+ * or {@code null} if none
+ * @return the merged configuration with override properties and cluster key applied
+ */
+ public static Configuration createClusterConf(Configuration baseConf, String clusterKey,
+ String overridePrefix) throws IOException {
+ Configuration clusterConf = HBaseConfiguration.create(baseConf);
+ if (clusterKey != null && !clusterKey.isEmpty()) {
+ applyClusterKeyToConf(clusterConf, clusterKey);
+ }
+
+ if (overridePrefix != null && !overridePrefix.isEmpty()) {
+ Configuration clusterSubset = HBaseConfiguration.subset(clusterConf, overridePrefix);
+ HBaseConfiguration.merge(clusterConf, clusterSubset);
+ }
+ return clusterConf;
+ }
+
+ /**
+ * Apply the settings in the given key to the given configuration, this is
+ * used to communicate with distant clusters
+ * @param conf configuration object to configure
+ * @param key string that contains the 3 required configuratins
+ * @throws IOException
+ */
+ private static void applyClusterKeyToConf(Configuration conf, String key)
+ throws IOException{
+ ZKConfig.ZKClusterKey zkClusterKey = ZKConfig.transformClusterKey(key);
+ conf.set(HConstants.ZOOKEEPER_QUORUM, zkClusterKey.getQuorumString());
+ conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, zkClusterKey.getClientPort());
+ conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, zkClusterKey.getZnodeParent());
+ }
+
+ /**
* For debugging. Dump configurations to system output as xml format.
* Master and RS configurations can also be dumped using
* http services. e.g. "curl http://master:16010/dump"
http://git-wip-us.apache.org/repos/asf/hbase/blob/c1e0fcc2/hbase-common/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKConfig.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKConfig.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKConfig.java
new file mode 100644
index 0000000..fe7396a
--- /dev/null
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKConfig.java
@@ -0,0 +1,301 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.zookeeper;
+
+import java.io.IOException;
+import java.util.Map.Entry;
+import java.util.Properties;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * Utility methods for reading, and building the ZooKeeper configuration.
+ *
+ * The order and priority for reading the config are as follows:
+ * (1). Property with "hbase.zookeeper.property." prefix from HBase XML
+ * (2). other zookeeper related properties in HBASE XML
+ */
+@InterfaceAudience.Private
+public final class ZKConfig {
+
+ private static final String VARIABLE_START = "${";
+
+ private ZKConfig() {
+ }
+
+ /**
+ * Make a Properties object holding ZooKeeper config.
+ * Parses the corresponding config options from the HBase XML configs
+ * and generates the appropriate ZooKeeper properties.
+ * @param conf Configuration to read from.
+ * @return Properties holding mappings representing ZooKeeper config file.
+ */
+ public static Properties makeZKProps(Configuration conf) {
+ return makeZKPropsFromHbaseConfig(conf);
+ }
+
+ /**
+ * Make a Properties object holding ZooKeeper config.
+ * Parses the corresponding config options from the HBase XML configs
+ * and generates the appropriate ZooKeeper properties.
+ *
+ * @param conf Configuration to read from.
+ * @return Properties holding mappings representing ZooKeeper config file.
+ */
+ private static Properties makeZKPropsFromHbaseConfig(Configuration conf) {
+ Properties zkProperties = new Properties();
+
+ // Directly map all of the hbase.zookeeper.property.KEY properties.
+ // Synchronize on conf so no loading of configs while we iterate
+ synchronized (conf) {
+ for (Entry<String, String> entry : conf) {
+ String key = entry.getKey();
+ if (key.startsWith(HConstants.ZK_CFG_PROPERTY_PREFIX)) {
+ String zkKey = key.substring(HConstants.ZK_CFG_PROPERTY_PREFIX_LEN);
+ String value = entry.getValue();
+ // If the value has variables substitutions, need to do a get.
+ if (value.contains(VARIABLE_START)) {
+ value = conf.get(key);
+ }
+ zkProperties.setProperty(zkKey, value);
+ }
+ }
+ }
+
+ // If clientPort is not set, assign the default.
+ if (zkProperties.getProperty(HConstants.CLIENT_PORT_STR) == null) {
+ zkProperties.put(HConstants.CLIENT_PORT_STR,
+ HConstants.DEFAULT_ZOOKEPER_CLIENT_PORT);
+ }
+
+ // Create the server.X properties.
+ int peerPort = conf.getInt("hbase.zookeeper.peerport", 2888);
+ int leaderPort = conf.getInt("hbase.zookeeper.leaderport", 3888);
+
+ final String[] serverHosts = conf.getStrings(HConstants.ZOOKEEPER_QUORUM,
+ HConstants.LOCALHOST);
+ String serverHost;
+ String address;
+ String key;
+ for (int i = 0; i < serverHosts.length; ++i) {
+ if (serverHosts[i].contains(":")) {
+ serverHost = serverHosts[i].substring(0, serverHosts[i].indexOf(':'));
+ } else {
+ serverHost = serverHosts[i];
+ }
+ address = serverHost + ":" + peerPort + ":" + leaderPort;
+ key = "server." + i;
+ zkProperties.put(key, address);
+ }
+
+ return zkProperties;
+ }
+
+ /**
+ * Return the ZK Quorum servers string given the specified configuration
+ *
+ * @param conf
+ * @return Quorum servers String
+ */
+ private static String getZKQuorumServersStringFromHbaseConfig(Configuration conf) {
+ String defaultClientPort = Integer.toString(
+ conf.getInt(HConstants.ZOOKEEPER_CLIENT_PORT, HConstants.DEFAULT_ZOOKEPER_CLIENT_PORT));
+
+ // Build the ZK quorum server string with "server:clientport" list, separated by ','
+ final String[] serverHosts =
+ conf.getStrings(HConstants.ZOOKEEPER_QUORUM, HConstants.LOCALHOST);
+ return buildZKQuorumServerString(serverHosts, defaultClientPort);
+ }
+
+ /**
+ * Return the ZK Quorum servers string given the specified configuration.
+ * @return Quorum servers
+ */
+ public static String getZKQuorumServersString(Configuration conf) {
+ return getZKQuorumServersStringFromHbaseConfig(conf);
+ }
+
+ /**
+ * Build the ZK quorum server string with "server:clientport" list, separated by ','
+ *
+ * @param serverHosts a list of servers for ZK quorum
+ * @param clientPort the default client port
+ * @return the string for a list of "server:port" separated by ","
+ */
+ public static String buildZKQuorumServerString(String[] serverHosts, String clientPort) {
+ StringBuilder quorumStringBuilder = new StringBuilder();
+ String serverHost;
+ for (int i = 0; i < serverHosts.length; ++i) {
+ if (serverHosts[i].contains(":")) {
+ serverHost = serverHosts[i]; // just use the port specified from the input
+ } else {
+ serverHost = serverHosts[i] + ":" + clientPort;
+ }
+ if (i > 0) {
+ quorumStringBuilder.append(',');
+ }
+ quorumStringBuilder.append(serverHost);
+ }
+ return quorumStringBuilder.toString();
+ }
+
+ /**
+ * Verifies that the given key matches the expected format for a ZooKeeper cluster key.
+ * The Quorum for the ZK cluster can have one the following formats (see examples below):
+ *
+ * <ol>
+ * <li>s1,s2,s3 (no client port in the list, the client port could be obtained from
+ * clientPort)</li>
+ * <li>s1:p1,s2:p2,s3:p3 (with client port, which could be same or different for each server,
+ * in this case, the clientPort would be ignored)</li>
+ * <li>s1:p1,s2,s3:p3 (mix of (1) and (2) - if port is not specified in a server, it would use
+ * the clientPort; otherwise, it would use the specified port)</li>
+ * </ol>
+ *
+ * @param key the cluster key to validate
+ * @throws IOException if the key could not be parsed
+ */
+ public static void validateClusterKey(String key) throws IOException {
+ transformClusterKey(key);
+ }
+
+ /**
+ * Separate the given key into the three configurations it should contain:
+ * hbase.zookeeper.quorum, hbase.zookeeper.client.port
+ * and zookeeper.znode.parent
+ * @param key
+ * @return the three configuration in the described order
+ * @throws IOException
+ */
+ public static ZKClusterKey transformClusterKey(String key) throws IOException {
+ String[] parts = key.split(":");
+
+ if (parts.length == 3) {
+ return new ZKClusterKey(parts [0], Integer.parseInt(parts [1]), parts [2]);
+ }
+
+ if (parts.length > 3) {
+ // The quorum could contain client port in server:clientport format, try to transform more.
+ String zNodeParent = parts [parts.length - 1];
+ String clientPort = parts [parts.length - 2];
+
+ // The first part length is the total length minus the lengths of other parts and minus 2 ":"
+ int endQuorumIndex = key.length() - zNodeParent.length() - clientPort.length() - 2;
+ String quorumStringInput = key.substring(0, endQuorumIndex);
+ String[] serverHosts = quorumStringInput.split(",");
+
+ // The common case is that every server has its own client port specified - this means
+ // that (total parts - the ZNodeParent part - the ClientPort part) is equal to
+ // (the number of "," + 1) - "+ 1" because the last server has no ",".
+ if ((parts.length - 2) == (serverHosts.length + 1)) {
+ return new ZKClusterKey(quorumStringInput, Integer.parseInt(clientPort), zNodeParent);
+ }
+
+ // For the uncommon case that some servers has no port specified, we need to build the
+ // server:clientport list using default client port for servers without specified port.
+ return new ZKClusterKey(
+ buildZKQuorumServerString(serverHosts, clientPort),
+ Integer.parseInt(clientPort),
+ zNodeParent);
+ }
+
+ throw new IOException("Cluster key passed " + key + " is invalid, the format should be:" +
+ HConstants.ZOOKEEPER_QUORUM + ":" + HConstants.ZOOKEEPER_CLIENT_PORT + ":"
+ + HConstants.ZOOKEEPER_ZNODE_PARENT);
+ }
+
+ /**
+ * Get the key to the ZK ensemble for this configuration without
+ * adding a name at the end
+ * @param conf Configuration to use to build the key
+ * @return ensemble key without a name
+ */
+ public static String getZooKeeperClusterKey(Configuration conf) {
+ return getZooKeeperClusterKey(conf, null);
+ }
+
+ /**
+ * Get the key to the ZK ensemble for this configuration and append
+ * a name at the end
+ * @param conf Configuration to use to build the key
+ * @param name Name that should be appended at the end if not empty or null
+ * @return ensemble key with a name (if any)
+ */
+ public static String getZooKeeperClusterKey(Configuration conf, String name) {
+ String ensemble = conf.get(HConstants.ZOOKEEPER_QUORUM).replaceAll(
+ "[\\t\\n\\x0B\\f\\r]", "");
+ StringBuilder builder = new StringBuilder(ensemble);
+ builder.append(":");
+ builder.append(conf.get(HConstants.ZOOKEEPER_CLIENT_PORT));
+ builder.append(":");
+ builder.append(conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT));
+ if (name != null && !name.isEmpty()) {
+ builder.append(",");
+ builder.append(name);
+ }
+ return builder.toString();
+ }
+
+ /**
+ * Standardize the ZK quorum string: make it a "server:clientport" list, separated by ','
+ * @param quorumStringInput a string contains a list of servers for ZK quorum
+ * @param clientPort the default client port
+ * @return the string for a list of "server:port" separated by ","
+ */
+ @VisibleForTesting
+ public static String standardizeZKQuorumServerString(String quorumStringInput,
+ String clientPort) {
+ String[] serverHosts = quorumStringInput.split(",");
+ return buildZKQuorumServerString(serverHosts, clientPort);
+ }
+
+ // The Quorum for the ZK cluster can have one the following format (see examples below):
+ // (1). s1,s2,s3 (no client port in the list, the client port could be obtained from clientPort)
+ // (2). s1:p1,s2:p2,s3:p3 (with client port, which could be same or different for each server,
+ // in this case, the clientPort would be ignored)
+ // (3). s1:p1,s2,s3:p3 (mix of (1) and (2) - if port is not specified in a server, it would use
+ // the clientPort; otherwise, it would use the specified port)
+ @VisibleForTesting
+ public static class ZKClusterKey {
+ private String quorumString;
+ private int clientPort;
+ private String znodeParent;
+
+ ZKClusterKey(String quorumString, int clientPort, String znodeParent) {
+ this.quorumString = quorumString;
+ this.clientPort = clientPort;
+ this.znodeParent = znodeParent;
+ }
+
+ public String getQuorumString() {
+ return quorumString;
+ }
+
+ public int getClientPort() {
+ return clientPort;
+ }
+
+ public String getZnodeParent() {
+ return znodeParent;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c1e0fcc2/hbase-common/src/test/java/org/apache/hadoop/hbase/TestHBaseConfiguration.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/TestHBaseConfiguration.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/TestHBaseConfiguration.java
index c11916f..6c14ef9 100644
--- a/hbase-common/src/test/java/org/apache/hadoop/hbase/TestHBaseConfiguration.java
+++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/TestHBaseConfiguration.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
@@ -27,11 +28,13 @@ import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.List;
+import com.google.common.collect.ImmutableMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.testclassification.MiscTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.zookeeper.ZKConfig;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -73,8 +76,11 @@ public class TestHBaseConfiguration {
String prefix = "hbase.mapred.output.";
conf.set("hbase.security.authentication", "kerberos");
conf.set("hbase.regionserver.kerberos.principal", "hbasesource");
- conf.set(prefix + "hbase.regionserver.kerberos.principal", "hbasedest");
- conf.set(prefix, "shouldbemissing");
+ HBaseConfiguration.setWithPrefix(conf, prefix,
+ ImmutableMap.of(
+ "hbase.regionserver.kerberos.principal", "hbasedest",
+ "", "shouldbemissing")
+ .entrySet());
Configuration subsetConf = HBaseConfiguration.subset(conf, prefix);
assertNull(subsetConf.get(prefix + "hbase.regionserver.kerberos.principal"));
http://git-wip-us.apache.org/repos/asf/hbase/blob/c1e0fcc2/hbase-common/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKConfig.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKConfig.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKConfig.java
new file mode 100644
index 0000000..7879aea
--- /dev/null
+++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKConfig.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.zookeeper;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.testclassification.MiscTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+@Category({MiscTests.class, SmallTests.class})
+public class TestZKConfig {
+
+ @Test
+ public void testZKConfigLoading() throws Exception {
+ Configuration conf = HBaseConfiguration.create();
+ // Test that we read only from the config instance
+ // (i.e. via hbase-default.xml and hbase-site.xml)
+ conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, 2181);
+ Properties props = ZKConfig.makeZKProps(conf);
+ assertEquals("Property client port should have been default from the HBase config",
+ "2181",
+ props.getProperty("clientPort"));
+ }
+
+ @Test
+ public void testGetZooKeeperClusterKey() {
+ Configuration conf = HBaseConfiguration.create();
+ conf.set(HConstants.ZOOKEEPER_QUORUM, "\tlocalhost\n");
+ conf.set(HConstants.ZOOKEEPER_CLIENT_PORT, "3333");
+ conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "hbase");
+ String clusterKey = ZKConfig.getZooKeeperClusterKey(conf, "test");
+ assertTrue(!clusterKey.contains("\t") && !clusterKey.contains("\n"));
+ assertEquals("localhost:3333:hbase,test", clusterKey);
+ }
+
+ @Test
+ public void testClusterKey() throws Exception {
+ testKey("server", 2181, "hbase");
+ testKey("server1,server2,server3", 2181, "hbase");
+ try {
+ ZKConfig.validateClusterKey("2181:hbase");
+ } catch (IOException ex) {
+ // OK
+ }
+ }
+
+ @Test
+ public void testClusterKeyWithMultiplePorts() throws Exception {
+ // server has different port than the default port
+ testKey("server1:2182", 2181, "hbase", true);
+ // multiple servers have their own port
+ testKey("server1:2182,server2:2183,server3:2184", 2181, "hbase", true);
+ // one server has no specified port, should use default port
+ testKey("server1:2182,server2,server3:2184", 2181, "hbase", true);
+ // the last server has no specified port, should use default port
+ testKey("server1:2182,server2:2183,server3", 2181, "hbase", true);
+ // multiple servers have no specified port, should use default port for those servers
+ testKey("server1:2182,server2,server3:2184,server4", 2181, "hbase", true);
+ // same server, different ports
+ testKey("server1:2182,server1:2183,server1", 2181, "hbase", true);
+ // mix of same server/different port and different server
+ testKey("server1:2182,server2:2183,server1", 2181, "hbase", true);
+ }
+
+ private void testKey(String ensemble, int port, String znode)
+ throws IOException {
+ testKey(ensemble, port, znode, false); // not support multiple client ports
+ }
+
+ private void testKey(String ensemble, int port, String znode, Boolean multiplePortSupport)
+ throws IOException {
+ Configuration conf = new Configuration();
+ String key = ensemble+":"+port+":"+znode;
+ String ensemble2 = null;
+ ZKConfig.ZKClusterKey zkClusterKey = ZKConfig.transformClusterKey(key);
+ if (multiplePortSupport) {
+ ensemble2 = ZKConfig.standardizeZKQuorumServerString(ensemble,
+ Integer.toString(port));
+ assertEquals(ensemble2, zkClusterKey.getQuorumString());
+ }
+ else {
+ assertEquals(ensemble, zkClusterKey.getQuorumString());
+ }
+ assertEquals(port, zkClusterKey.getClientPort());
+ assertEquals(znode, zkClusterKey.getZnodeParent());
+
+ conf = HBaseConfiguration.createClusterConf(conf, key);
+ assertEquals(zkClusterKey.getQuorumString(), conf.get(HConstants.ZOOKEEPER_QUORUM));
+ assertEquals(zkClusterKey.getClientPort(), conf.getInt(HConstants.ZOOKEEPER_CLIENT_PORT, -1));
+ assertEquals(zkClusterKey.getZnodeParent(), conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT));
+
+ String reconstructedKey = ZKConfig.getZooKeeperClusterKey(conf);
+ if (multiplePortSupport) {
+ String key2 = ensemble2 + ":" + port + ":" + znode;
+ assertEquals(key2, reconstructedKey);
+ }
+ else {
+ assertEquals(key, reconstructedKey);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c1e0fcc2/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/SyncTable.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/SyncTable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/SyncTable.java
index 20d6e24..1658ba4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/SyncTable.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/SyncTable.java
@@ -43,7 +43,6 @@ import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
@@ -174,8 +173,9 @@ public class SyncTable extends Configured implements Tool {
Configuration conf = context.getConfiguration();
sourceHashDir = new Path(conf.get(SOURCE_HASH_DIR_CONF_KEY));
- sourceConnection = openConnection(conf, SOURCE_ZK_CLUSTER_CONF_KEY);
- targetConnection = openConnection(conf, TARGET_ZK_CLUSTER_CONF_KEY);
+ sourceConnection = openConnection(conf, SOURCE_ZK_CLUSTER_CONF_KEY, null);
+ targetConnection = openConnection(conf, TARGET_ZK_CLUSTER_CONF_KEY,
+ TableOutputFormat.OUTPUT_CONF_PREFIX);
sourceTable = openTable(sourceConnection, conf, SOURCE_TABLE_CONF_KEY);
targetTable = openTable(targetConnection, conf, TARGET_TABLE_CONF_KEY);
dryRun = conf.getBoolean(SOURCE_TABLE_CONF_KEY, false);
@@ -196,13 +196,12 @@ public class SyncTable extends Configured implements Tool {
targetHasher = new HashTable.ResultHasher();
}
- private static Connection openConnection(Configuration conf, String zkClusterConfKey)
+ private static Connection openConnection(Configuration conf, String zkClusterConfKey,
+ String configPrefix)
throws IOException {
- Configuration clusterConf = new Configuration(conf);
String zkCluster = conf.get(zkClusterConfKey);
- if (zkCluster != null) {
- ZKUtil.applyClusterKeyToConf(clusterConf, zkCluster);
- }
+ Configuration clusterConf = HBaseConfiguration.createClusterConf(conf,
+ zkCluster, configPrefix);
return ConnectionFactory.createConnection(clusterConf);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c1e0fcc2/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
index cc8a35c..a48871f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
@@ -43,12 +43,11 @@ import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.security.token.TokenUtil;
import org.apache.hadoop.hbase.util.Base64;
import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZKConfig;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.StringUtils;
-import com.google.protobuf.InvalidProtocolBufferException;
import java.io.File;
import java.io.IOException;
@@ -485,12 +484,8 @@ public class TableMapReduceUtil {
String quorumAddress = job.getConfiguration().get(TableOutputFormat.QUORUM_ADDRESS);
User user = userProvider.getCurrent();
if (quorumAddress != null) {
- Configuration peerConf = HBaseConfiguration.create(job.getConfiguration());
- ZKUtil.applyClusterKeyToConf(peerConf, quorumAddress);
- // apply any "hbase.mapred.output." configuration overrides
- Configuration outputOverrides =
- HBaseConfiguration.subset(peerConf, TableOutputFormat.OUTPUT_CONF_PREFIX);
- HBaseConfiguration.merge(peerConf, outputOverrides);
+ Configuration peerConf = HBaseConfiguration.createClusterConf(job.getConfiguration(),
+ quorumAddress, TableOutputFormat.OUTPUT_CONF_PREFIX);
Connection peerConn = ConnectionFactory.createConnection(peerConf);
try {
TokenUtil.addTokenForJob(peerConn, user, job);
@@ -523,15 +518,30 @@ public class TableMapReduceUtil {
* @param job The job that requires the permission.
* @param quorumAddress string that contains the 3 required configuratins
* @throws IOException When the authentication token cannot be obtained.
+ * @deprecated Since 1.2.0, use {@link #initCredentialsForCluster(Job, Configuration)} instead.
*/
+ @Deprecated
public static void initCredentialsForCluster(Job job, String quorumAddress)
throws IOException {
+ Configuration peerConf = HBaseConfiguration.createClusterConf(job.getConfiguration(),
+ quorumAddress);
+ initCredentialsForCluster(job, peerConf);
+ }
+
+ /**
+ * Obtain an authentication token, for the specified cluster, on behalf of the current user
+ * and add it to the credentials for the given map reduce job.
+ *
+ * @param job The job that requires the permission.
+ * @param conf The configuration to use in connecting to the peer cluster
+ * @throws IOException When the authentication token cannot be obtained.
+ */
+ public static void initCredentialsForCluster(Job job, Configuration conf)
+ throws IOException {
UserProvider userProvider = UserProvider.instantiate(job.getConfiguration());
if (userProvider.isHBaseSecurityEnabled()) {
try {
- Configuration peerConf = HBaseConfiguration.create(job.getConfiguration());
- ZKUtil.applyClusterKeyToConf(peerConf, quorumAddress);
- Connection peerConn = ConnectionFactory.createConnection(peerConf);
+ Connection peerConn = ConnectionFactory.createConnection(conf);
try {
TokenUtil.addTokenForJob(peerConn, userProvider.getCurrent(), job);
} finally {
@@ -680,7 +690,7 @@ public class TableMapReduceUtil {
// If passed a quorum/ensemble address, pass it on to TableOutputFormat.
if (quorumAddress != null) {
// Calling this will validate the format
- ZKUtil.transformClusterKey(quorumAddress);
+ ZKConfig.validateClusterKey(quorumAddress);
conf.set(TableOutputFormat.QUORUM_ADDRESS,quorumAddress);
}
if (serverClass != null && serverImpl != null) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/c1e0fcc2/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java
index 201e78f..998d700 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java
@@ -35,7 +35,6 @@ import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.OutputFormat;
@@ -195,22 +194,19 @@ implements Configurable {
@Override
public void setConf(Configuration otherConf) {
- this.conf = HBaseConfiguration.create(otherConf);
-
- String tableName = this.conf.get(OUTPUT_TABLE);
+ String tableName = otherConf.get(OUTPUT_TABLE);
if(tableName == null || tableName.length() <= 0) {
throw new IllegalArgumentException("Must specify table name");
}
- String address = this.conf.get(QUORUM_ADDRESS);
- int zkClientPort = this.conf.getInt(QUORUM_PORT, 0);
- String serverClass = this.conf.get(REGION_SERVER_CLASS);
- String serverImpl = this.conf.get(REGION_SERVER_IMPL);
+ String address = otherConf.get(QUORUM_ADDRESS);
+ int zkClientPort = otherConf.getInt(QUORUM_PORT, 0);
+ String serverClass = otherConf.get(REGION_SERVER_CLASS);
+ String serverImpl = otherConf.get(REGION_SERVER_IMPL);
try {
- if (address != null) {
- ZKUtil.applyClusterKeyToConf(this.conf, address);
- }
+ this.conf = HBaseConfiguration.createClusterConf(otherConf, address, OUTPUT_CONF_PREFIX);
+
if (serverClass != null) {
this.conf.set(HConstants.REGION_SERVER_IMPL, serverImpl);
}
@@ -221,9 +217,5 @@ implements Configurable {
LOG.error(e);
throw new RuntimeException(e);
}
-
- // finally apply any remaining "hbase.mapred.output." configuration overrides
- Configuration outputOverrides = HBaseConfiguration.subset(otherConf, OUTPUT_CONF_PREFIX);
- HBaseConfiguration.merge(this.conf, outputOverrides);
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c1e0fcc2/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
index 76ac541..e6b4802 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
@@ -48,7 +48,6 @@ import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
-import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.hadoop.util.Tool;
@@ -70,6 +69,7 @@ public class VerifyReplication extends Configured implements Tool {
LogFactory.getLog(VerifyReplication.class);
public final static String NAME = "verifyrep";
+ private final static String PEER_CONFIG_PREFIX = NAME + ".peer.";
static long startTime = 0;
static long endTime = Long.MAX_VALUE;
static int versions = -1;
@@ -130,8 +130,8 @@ public class VerifyReplication extends Configured implements Tool {
final TableSplit tableSplit = (TableSplit)(context.getInputSplit());
String zkClusterKey = conf.get(NAME + ".peerQuorumAddress");
- Configuration peerConf = HBaseConfiguration.create(conf);
- ZKUtil.applyClusterKeyToConf(peerConf, zkClusterKey);
+ Configuration peerConf = HBaseConfiguration.createClusterConf(conf,
+ zkClusterKey, PEER_CONFIG_PREFIX);
TableName tableName = TableName.valueOf(conf.get(NAME + ".tableName"));
connection = ConnectionFactory.createConnection(peerConf);
@@ -211,7 +211,8 @@ public class VerifyReplication extends Configured implements Tool {
}
}
- private static String getPeerQuorumAddress(final Configuration conf) throws IOException {
+ private static Pair<ReplicationPeerConfig, Configuration> getPeerQuorumConfig(
+ final Configuration conf) throws IOException {
ZooKeeperWatcher localZKW = null;
ReplicationPeerZKImpl peer = null;
try {
@@ -228,8 +229,8 @@ public class VerifyReplication extends Configured implements Tool {
if (pair == null) {
throw new IOException("Couldn't get peer conf!");
}
- Configuration peerConf = rp.getPeerConf(peerId).getSecond();
- return ZKUtil.getZooKeeperClusterKey(peerConf);
+
+ return pair;
} catch (ReplicationException e) {
throw new IOException(
"An error occured while trying to connect to the remove peer cluster", e);
@@ -268,9 +269,14 @@ public class VerifyReplication extends Configured implements Tool {
conf.set(NAME+".families", families);
}
- String peerQuorumAddress = getPeerQuorumAddress(conf);
+ Pair<ReplicationPeerConfig, Configuration> peerConfigPair = getPeerQuorumConfig(conf);
+ ReplicationPeerConfig peerConfig = peerConfigPair.getFirst();
+ String peerQuorumAddress = peerConfig.getClusterKey();
+ LOG.info("Peer Quorum Address: " + peerQuorumAddress + ", Peer Configuration: " +
+ peerConfig.getConfiguration());
conf.set(NAME + ".peerQuorumAddress", peerQuorumAddress);
- LOG.info("Peer Quorum Address: " + peerQuorumAddress);
+ HBaseConfiguration.setWithPrefix(conf, PEER_CONFIG_PREFIX,
+ peerConfig.getConfiguration().entrySet());
conf.setInt(NAME + ".versions", versions);
LOG.info("Number of version: " + versions);
@@ -293,8 +299,9 @@ public class VerifyReplication extends Configured implements Tool {
TableMapReduceUtil.initTableMapperJob(tableName, scan,
Verifier.class, null, null, job);
+ Configuration peerClusterConf = peerConfigPair.getSecond();
// Obtain the auth token from peer cluster
- TableMapReduceUtil.initCredentialsForCluster(job, peerQuorumAddress);
+ TableMapReduceUtil.initCredentialsForCluster(job, peerClusterConf);
job.setOutputFormatClass(NullOutputFormat.class);
job.setNumReduceTasks(0);
http://git-wip-us.apache.org/repos/asf/hbase/blob/c1e0fcc2/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java
index 5c61afb..2ba1b47 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java
@@ -33,7 +33,7 @@ import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.regionserver.RegionReplicaReplicationEndpoint;
-import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZKConfig;
/**
* Similar to {@link RegionReplicaUtil} but for the server side
@@ -148,7 +148,7 @@ public class ServerRegionReplicaUtil extends RegionReplicaUtil {
try {
if (repAdmin.getPeerConfig(REGION_REPLICA_REPLICATION_PEER) == null) {
ReplicationPeerConfig peerConfig = new ReplicationPeerConfig();
- peerConfig.setClusterKey(ZKUtil.getZooKeeperClusterKey(conf));
+ peerConfig.setClusterKey(ZKConfig.getZooKeeperClusterKey(conf));
peerConfig.setReplicationEndpointImpl(RegionReplicaReplicationEndpoint.class.getName());
repAdmin.addPeer(REGION_REPLICA_REPLICATION_PEER, peerConfig, null);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c1e0fcc2/hbase-server/src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java
index 4e1599a..77d01e2 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java
@@ -343,71 +343,6 @@ public class TestZooKeeper {
assertNull(ZKUtil.getDataNoWatch(zkw, "/l1/l2", null));
}
- @Test
- public void testClusterKey() throws Exception {
- testKey("server", 2181, "hbase");
- testKey("server1,server2,server3", 2181, "hbase");
- try {
- ZKUtil.transformClusterKey("2181:hbase");
- } catch (IOException ex) {
- // OK
- }
- }
-
- @Test
- public void testClusterKeyWithMultiplePorts() throws Exception {
- // server has different port than the default port
- testKey("server1:2182", 2181, "hbase", true);
- // multiple servers have their own port
- testKey("server1:2182,server2:2183,server3:2184", 2181, "hbase", true);
- // one server has no specified port, should use default port
- testKey("server1:2182,server2,server3:2184", 2181, "hbase", true);
- // the last server has no specified port, should use default port
- testKey("server1:2182,server2:2183,server3", 2181, "hbase", true);
- // multiple servers have no specified port, should use default port for those servers
- testKey("server1:2182,server2,server3:2184,server4", 2181, "hbase", true);
- // same server, different ports
- testKey("server1:2182,server1:2183,server1", 2181, "hbase", true);
- // mix of same server/different port and different server
- testKey("server1:2182,server2:2183,server1", 2181, "hbase", true);
- }
-
- private void testKey(String ensemble, int port, String znode)
- throws IOException {
- testKey(ensemble, port, znode, false); // not support multiple client ports
- }
-
- private void testKey(String ensemble, int port, String znode, Boolean multiplePortSupport)
- throws IOException {
- Configuration conf = new Configuration();
- String key = ensemble+":"+port+":"+znode;
- String ensemble2 = null;
- ZKUtil.ZKClusterKey zkClusterKey = ZKUtil.transformClusterKey(key);
- if (multiplePortSupport) {
- ensemble2 = ZKUtil.standardizeQuorumServerString(ensemble, Integer.toString(port));
- assertEquals(ensemble2, zkClusterKey.quorumString);
- }
- else {
- assertEquals(ensemble, zkClusterKey.quorumString);
- }
- assertEquals(port, zkClusterKey.clientPort);
- assertEquals(znode, zkClusterKey.znodeParent);
-
- ZKUtil.applyClusterKeyToConf(conf, key);
- assertEquals(zkClusterKey.quorumString, conf.get(HConstants.ZOOKEEPER_QUORUM));
- assertEquals(zkClusterKey.clientPort, conf.getInt(HConstants.ZOOKEEPER_CLIENT_PORT, -1));
- assertEquals(zkClusterKey.znodeParent, conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT));
-
- String reconstructedKey = ZKUtil.getZooKeeperClusterKey(conf);
- if (multiplePortSupport) {
- String key2 = ensemble2 + ":" + port + ":" + znode;
- assertEquals(key2, reconstructedKey);
- }
- else {
- assertEquals(key, reconstructedKey);
- }
- }
-
/**
* A test for HBASE-3238
* @throws IOException A connection attempt to zk failed
http://git-wip-us.apache.org/repos/asf/hbase/blob/c1e0fcc2/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
index e187b9b..e18220d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
@@ -24,9 +24,13 @@ import java.util.TreeMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.*;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationFactory;
+import org.apache.hadoop.hbase.replication.ReplicationPeer;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
@@ -38,10 +42,12 @@ import org.junit.experimental.categories.Category;
import com.google.common.collect.Lists;
-import static org.junit.Assert.fail;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
/**
* Unit testing of ReplicationAdmin
@@ -117,7 +123,29 @@ public class TestReplicationAdmin {
admin.removePeer(ID_SECOND);
assertEquals(0, admin.getPeersCount());
}
-
+
+ /**
+ * Tests that the peer configuration used by ReplicationAdmin contains all
+ * the peer's properties.
+ */
+ @Test
+ public void testPeerConfig() throws Exception {
+ ReplicationPeerConfig config = new ReplicationPeerConfig();
+ config.setClusterKey(KEY_ONE);
+ config.getConfiguration().put("key1", "value1");
+ config.getConfiguration().put("key2", "value2");
+ admin.addPeer(ID_ONE, config, null);
+
+ List<ReplicationPeer> peers = admin.listValidReplicationPeers();
+ assertEquals(1, peers.size());
+ ReplicationPeer peerOne = peers.get(0);
+ assertNotNull(peerOne);
+ assertEquals("value1", peerOne.getConfiguration().get("key1"));
+ assertEquals("value2", peerOne.getConfiguration().get("key2"));
+
+ admin.removePeer(ID_ONE);
+ }
+
@Test
public void testAddPeerWithUnDeletedQueues() throws Exception {
Configuration conf = TEST_UTIL.getConfiguration();
http://git-wip-us.apache.org/repos/asf/hbase/blob/c1e0fcc2/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
index 52fb41c..a5a4e73 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
@@ -43,7 +43,7 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.WAL.Entry;
-import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZKConfig;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
@@ -115,7 +115,7 @@ public class TestReplicationEndpoint extends TestReplicationBase {
public void testCustomReplicationEndpoint() throws Exception {
// test installing a custom replication endpoint other than the default one.
admin.addPeer("testCustomReplicationEndpoint",
- new ReplicationPeerConfig().setClusterKey(ZKUtil.getZooKeeperClusterKey(conf1))
+ new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1))
.setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName()), null);
// check whether the class has been constructed and started
@@ -157,7 +157,7 @@ public class TestReplicationEndpoint extends TestReplicationBase {
int peerCount = admin.getPeersCount();
final String id = "testReplicationEndpointReturnsFalseOnReplicate";
admin.addPeer(id,
- new ReplicationPeerConfig().setClusterKey(ZKUtil.getZooKeeperClusterKey(conf1))
+ new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1))
.setReplicationEndpointImpl(ReplicationEndpointReturningFalse.class.getName()), null);
// This test is flakey and then there is so much stuff flying around in here its, hard to
// debug. Peer needs to be up for the edit to make it across. This wait on
@@ -209,7 +209,7 @@ public class TestReplicationEndpoint extends TestReplicationBase {
}
admin.addPeer(id,
- new ReplicationPeerConfig().setClusterKey(ZKUtil.getZooKeeperClusterKey(conf2))
+ new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf2))
.setReplicationEndpointImpl(InterClusterReplicationEndpointForTest.class.getName()),
null);
@@ -234,7 +234,7 @@ public class TestReplicationEndpoint extends TestReplicationBase {
@Test (timeout=120000)
public void testWALEntryFilterFromReplicationEndpoint() throws Exception {
admin.addPeer("testWALEntryFilterFromReplicationEndpoint",
- new ReplicationPeerConfig().setClusterKey(ZKUtil.getZooKeeperClusterKey(conf1))
+ new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1))
.setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName()), null);
// now replicate some data.
try (Connection connection = ConnectionFactory.createConnection(conf1)) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/c1e0fcc2/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
index f05eceb..696c130 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
@@ -27,7 +27,7 @@ import java.util.SortedSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZKConfig;
import org.apache.zookeeper.KeeperException;
import org.junit.Before;
import org.junit.Test;
@@ -202,7 +202,7 @@ public abstract class TestReplicationStateBasic {
fail("There are no connected peers, should have thrown an IllegalArgumentException");
} catch (IllegalArgumentException e) {
}
- assertEquals(KEY_ONE, ZKUtil.getZooKeeperClusterKey(rp.getPeerConf(ID_ONE).getSecond()));
+ assertEquals(KEY_ONE, ZKConfig.getZooKeeperClusterKey(rp.getPeerConf(ID_ONE).getSecond()));
rp.removePeer(ID_ONE);
rp.peerRemoved(ID_ONE);
assertNumberOfPeers(1);
http://git-wip-us.apache.org/repos/asf/hbase/blob/c1e0fcc2/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java
index fff6c9d..4587c61 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
+import org.apache.hadoop.hbase.zookeeper.ZKConfig;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException;
@@ -79,7 +80,7 @@ public class TestReplicationStateZKImpl extends TestReplicationStateBasic {
String fakeRs = ZKUtil.joinZNode(zkw1.rsZNode, "hostname1.example.org:1234");
ZKUtil.createWithParents(zkw1, fakeRs);
ZKClusterId.setClusterId(zkw1, new ClusterId());
- return ZKUtil.getZooKeeperClusterKey(testConf);
+ return ZKConfig.getZooKeeperClusterKey(testConf);
}
@Before
@@ -94,7 +95,7 @@ public class TestReplicationStateZKImpl extends TestReplicationStateBasic {
rq3 = ReplicationFactory.getReplicationQueues(zkw, conf, ds3);
rqc = ReplicationFactory.getReplicationQueuesClient(zkw, conf, ds1);
rp = ReplicationFactory.getReplicationPeers(zkw, conf, zkw);
- OUR_KEY = ZKUtil.getZooKeeperClusterKey(conf);
+ OUR_KEY = ZKConfig.getZooKeeperClusterKey(conf);
rqZK = new ReplicationQueuesZKImpl(zkw, conf, ds1);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c1e0fcc2/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java
index 2231f0e..65600ff 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java
@@ -53,7 +53,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.testclassification.FlakeyTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
-import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZKConfig;
import org.apache.log4j.Level;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@@ -129,7 +129,8 @@ public class TestRegionReplicaReplicationEndpoint {
// assert peer configuration is correct
peerConfig = admin.getPeerConfig(peerId);
assertNotNull(peerConfig);
- assertEquals(peerConfig.getClusterKey(), ZKUtil.getZooKeeperClusterKey(HTU.getConfiguration()));
+ assertEquals(peerConfig.getClusterKey(), ZKConfig.getZooKeeperClusterKey(
+ HTU.getConfiguration()));
assertEquals(peerConfig.getReplicationEndpointImpl(),
RegionReplicaReplicationEndpoint.class.getName());
admin.close();
@@ -162,7 +163,8 @@ public class TestRegionReplicaReplicationEndpoint {
// assert peer configuration is correct
peerConfig = admin.getPeerConfig(peerId);
assertNotNull(peerConfig);
- assertEquals(peerConfig.getClusterKey(), ZKUtil.getZooKeeperClusterKey(HTU.getConfiguration()));
+ assertEquals(peerConfig.getClusterKey(), ZKConfig.getZooKeeperClusterKey(
+ HTU.getConfiguration()));
assertEquals(peerConfig.getReplicationEndpointImpl(),
RegionReplicaReplicationEndpoint.class.getName());
admin.close();
http://git-wip-us.apache.org/repos/asf/hbase/blob/c1e0fcc2/hbase-server/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKConfig.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKConfig.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKConfig.java
deleted file mode 100644
index 8f5961f..0000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKConfig.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.zookeeper;
-
-import java.util.Properties;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.testclassification.MiscTests;
-import org.apache.hadoop.hbase.testclassification.SmallTests;
-import org.junit.Assert;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-@Category({MiscTests.class, SmallTests.class})
-public class TestZKConfig {
-
- @Test
- public void testZKConfigLoading() throws Exception {
- Configuration conf = HBaseConfiguration.create();
- // Test that we read only from the config instance
- // (i.e. via hbase-default.xml and hbase-site.xml)
- conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, 2181);
- Properties props = ZKConfig.makeZKProps(conf);
- Assert.assertEquals("Property client port should have been default from the HBase config",
- "2181",
- props.getProperty("clientPort"));
- }
-}