You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ga...@apache.org on 2015/12/10 01:49:27 UTC

[2/2] hbase git commit: HBASE-14866 VerifyReplication and ReplicationAdmin should use full peer configuration for peer connection

HBASE-14866 VerifyReplication and ReplicationAdmin should use full peer configuration for peer connection


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/5dec5ad2
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/5dec5ad2
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/5dec5ad2

Branch: refs/heads/branch-1
Commit: 5dec5ad250322ba3dab8ff9800e82c039e4dce2e
Parents: 967873b
Author: Gary Helmling <ga...@apache.org>
Authored: Wed Dec 9 16:47:25 2015 -0800
Committer: Gary Helmling <ga...@apache.org>
Committed: Wed Dec 9 16:47:25 2015 -0800

----------------------------------------------------------------------
 .../client/replication/ReplicationAdmin.java    |  14 +-
 .../replication/ReplicationPeersZKImpl.java     |   7 +-
 .../replication/ReplicationStateZKBase.java     |   3 +-
 .../apache/hadoop/hbase/zookeeper/ZKConfig.java | 350 -------------
 .../apache/hadoop/hbase/zookeeper/ZKUtil.java   | 124 -----
 .../hadoop/hbase/zookeeper/TestZKUtil.java      |  11 -
 .../apache/hadoop/hbase/HBaseConfiguration.java |  83 +++-
 .../apache/hadoop/hbase/zookeeper/ZKConfig.java | 495 +++++++++++++++++++
 .../hadoop/hbase/TestHBaseConfiguration.java    |  10 +-
 .../hadoop/hbase/zookeeper/TestZKConfig.java    | 126 +++++
 .../hadoop/hbase/mapreduce/SyncTable.java       |  15 +-
 .../hbase/mapreduce/TableMapReduceUtil.java     |  35 +-
 .../hbase/mapreduce/TableOutputFormat.java      |  22 +-
 .../replication/VerifyReplication.java          |  25 +-
 .../hbase/util/ServerRegionReplicaUtil.java     |   4 +-
 .../org/apache/hadoop/hbase/TestZooKeeper.java  |  65 ---
 .../replication/TestReplicationAdmin.java       |  36 +-
 .../replication/TestReplicationEndpoint.java    |  10 +-
 .../replication/TestReplicationStateBasic.java  |   4 +-
 .../replication/TestReplicationStateZKImpl.java |   5 +-
 .../TestRegionReplicaReplicationEndpoint.java   |   8 +-
 .../hadoop/hbase/zookeeper/TestZKConfig.java    |  56 ---
 22 files changed, 818 insertions(+), 690 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/5dec5ad2/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
index 3a83d13..24a3dcb 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
@@ -56,7 +56,6 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerZKImpl;
 import org.apache.hadoop.hbase.replication.ReplicationPeers;
 import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
 import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.hbase.zookeeper.ZKUtil;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.data.Stat;
@@ -627,7 +626,8 @@ public class ReplicationAdmin implements Closeable {
     }
   }
 
-  private List<ReplicationPeer> listValidReplicationPeers() {
+  @VisibleForTesting
+  List<ReplicationPeer> listValidReplicationPeers() {
     Map<String, ReplicationPeerConfig> peers = listPeerConfigs();
     if (peers == null || peers.size() <= 0) {
       return null;
@@ -635,18 +635,16 @@ public class ReplicationAdmin implements Closeable {
     List<ReplicationPeer> validPeers = new ArrayList<ReplicationPeer>(peers.size());
     for (Entry<String, ReplicationPeerConfig> peerEntry : peers.entrySet()) {
       String peerId = peerEntry.getKey();
-      String clusterKey = peerEntry.getValue().getClusterKey();
-      Configuration peerConf = new Configuration(this.connection.getConfiguration());
       Stat s = null;
       try {
-        ZKUtil.applyClusterKeyToConf(peerConf, clusterKey);
         Pair<ReplicationPeerConfig, Configuration> pair = this.replicationPeers.getPeerConf(peerId);
+        Configuration peerConf = pair.getSecond();
         ReplicationPeer peer = new ReplicationPeerZKImpl(peerConf, peerId, pair.getFirst());
         s =
             zkw.getRecoverableZooKeeper().exists(peerConf.get(HConstants.ZOOKEEPER_ZNODE_PARENT),
               null);
         if (null == s) {
-          LOG.info(peerId + ' ' + clusterKey + " is invalid now.");
+          LOG.info(peerId + ' ' + pair.getFirst().getClusterKey() + " is invalid now.");
           continue;
         }
         validPeers.add(peer);
@@ -664,10 +662,6 @@ public class ReplicationAdmin implements Closeable {
         LOG.warn("Failed to get valid replication peers due to InterruptedException.");
         LOG.debug("Failure details to get valid replication peers.", e);
         continue;
-      } catch (IOException e) {
-        LOG.warn("Failed to get valid replication peers due to IOException.");
-        LOG.debug("Failure details to get valid replication peers.", e);
-        continue;
       }
     }
     return validPeers;

http://git-wip-us.apache.org/repos/asf/hbase/blob/5dec5ad2/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
index a7d7dda..7099bfc 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
@@ -29,6 +29,7 @@ import java.util.concurrent.ConcurrentMap;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Abortable;
@@ -318,11 +319,9 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
       return null;
     }
 
-    Configuration otherConf = new Configuration(this.conf);
+    Configuration otherConf;
     try {
-      if (peerConfig.getClusterKey() != null && !peerConfig.getClusterKey().isEmpty()) {
-        ZKUtil.applyClusterKeyToConf(otherConf, peerConfig.getClusterKey());
-      }
+      otherConf = HBaseConfiguration.createClusterConf(this.conf, peerConfig.getClusterKey());
     } catch (IOException e) {
       LOG.error("Can't get peer configuration for peerId=" + peerId + " because:", e);
       return null;

http://git-wip-us.apache.org/repos/asf/hbase/blob/5dec5ad2/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java
index 1691b3f..4fbac0f 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
+import org.apache.hadoop.hbase.zookeeper.ZKConfig;
 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.apache.zookeeper.KeeperException;
@@ -69,7 +70,7 @@ public abstract class ReplicationStateZKBase {
     String peersZNodeName = conf.get("zookeeper.znode.replication.peers", "peers");
     String queuesZNodeName = conf.get("zookeeper.znode.replication.rs", "rs");
     this.peerStateNodeName = conf.get("zookeeper.znode.replication.peers.state", "peer-state");
-    this.ourClusterKey = ZKUtil.getZooKeeperClusterKey(this.conf);
+    this.ourClusterKey = ZKConfig.getZooKeeperClusterKey(this.conf);
     this.replicationZNode = ZKUtil.joinZNode(this.zookeeper.baseZNode, replicationZNodeName);
     this.peersZNode = ZKUtil.joinZNode(replicationZNode, peersZNodeName);
     this.queuesZNode = ZKUtil.joinZNode(replicationZNode, queuesZNodeName);

http://git-wip-us.apache.org/repos/asf/hbase/blob/5dec5ad2/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKConfig.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKConfig.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKConfig.java
deleted file mode 100644
index 15752c2..0000000
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKConfig.java
+++ /dev/null
@@ -1,350 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.zookeeper;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map.Entry;
-import java.util.Properties;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HConstants;
-
-/**
- * Utility methods for reading, and building the ZooKeeper configuration.
- *
- * The order and priority for reading the config are as follows:
- * (1). zoo.cfg if ""hbase.config.read.zookeeper.config" is true
- * (2). Property with "hbase.zookeeper.property." prefix from HBase XML
- * (3). other zookeeper related properties in HBASE XML
- */
-@InterfaceAudience.Private
-public class ZKConfig {
-  private static final Log LOG = LogFactory.getLog(ZKConfig.class);
-
-  private static final String VARIABLE_START = "${";
-  private static final int VARIABLE_START_LENGTH = VARIABLE_START.length();
-  private static final String VARIABLE_END = "}";
-  private static final int VARIABLE_END_LENGTH = VARIABLE_END.length();
-
-  /**
-   * Make a Properties object holding ZooKeeper config.
-   * Parses the corresponding config options from the HBase XML configs
-   * and generates the appropriate ZooKeeper properties.
-   * @param conf Configuration to read from.
-   * @return Properties holding mappings representing ZooKeeper config file.
-   */
-  public static Properties makeZKProps(Configuration conf) {
-    Properties zkProperties = makeZKPropsFromZooCfg(conf);
-
-    if (zkProperties == null) {
-      // Otherwise, use the configuration options from HBase's XML files.
-      zkProperties = makeZKPropsFromHbaseConfig(conf);
-    }
-    return zkProperties;
-  }
-
-  /**
-   * Parses the corresponding config options from the zoo.cfg file
-   * and make a Properties object holding the Zookeeper config.
-   *
-   * @param conf Configuration to read from.
-   * @return Properties holding mappings representing the ZooKeeper config file or null if
-   * the HBASE_CONFIG_READ_ZOOKEEPER_CONFIG is false or the file does not exist.
-   */
-  private static Properties makeZKPropsFromZooCfg(Configuration conf) {
-    if (conf.getBoolean(HConstants.HBASE_CONFIG_READ_ZOOKEEPER_CONFIG, false)) {
-      LOG.warn(
-          "Parsing ZooKeeper's " + HConstants.ZOOKEEPER_CONFIG_NAME +
-          " file for ZK properties " +
-          "has been deprecated. Please instead place all ZK related HBase " +
-          "configuration under the hbase-site.xml, using prefixes " +
-          "of the form '" + HConstants.ZK_CFG_PROPERTY_PREFIX + "', and " +
-          "set property '" + HConstants.HBASE_CONFIG_READ_ZOOKEEPER_CONFIG +
-          "' to false");
-      // First check if there is a zoo.cfg in the CLASSPATH. If so, simply read
-      // it and grab its configuration properties.
-      ClassLoader cl = HQuorumPeer.class.getClassLoader();
-      final InputStream inputStream =
-        cl.getResourceAsStream(HConstants.ZOOKEEPER_CONFIG_NAME);
-      if (inputStream != null) {
-        try {
-          return parseZooCfg(conf, inputStream);
-        } catch (IOException e) {
-          LOG.warn("Cannot read " + HConstants.ZOOKEEPER_CONFIG_NAME +
-                   ", loading from XML files", e);
-        }
-      }
-    } else {
-      if (LOG.isTraceEnabled()) {
-        LOG.trace("Skipped reading ZK properties file '" + HConstants.ZOOKEEPER_CONFIG_NAME +
-          "' since '" + HConstants.HBASE_CONFIG_READ_ZOOKEEPER_CONFIG + "' was not set to true");
-      }
-    }
-
-    return null;
-  }
-
-  /**
-   * Make a Properties object holding ZooKeeper config.
-   * Parses the corresponding config options from the HBase XML configs
-   * and generates the appropriate ZooKeeper properties.
-   *
-   * @param conf Configuration to read from.
-   * @return Properties holding mappings representing ZooKeeper config file.
-   */
-  private static Properties makeZKPropsFromHbaseConfig(Configuration conf) {
-    Properties zkProperties = new Properties();
-
-    // Directly map all of the hbase.zookeeper.property.KEY properties.
-    // Synchronize on conf so no loading of configs while we iterate
-    synchronized (conf) {
-      for (Entry<String, String> entry : conf) {
-        String key = entry.getKey();
-        if (key.startsWith(HConstants.ZK_CFG_PROPERTY_PREFIX)) {
-          String zkKey = key.substring(HConstants.ZK_CFG_PROPERTY_PREFIX_LEN);
-          String value = entry.getValue();
-          // If the value has variables substitutions, need to do a get.
-          if (value.contains(VARIABLE_START)) {
-            value = conf.get(key);
-          }
-          zkProperties.put(zkKey, value);
-        }
-      }
-    }
-
-    // If clientPort is not set, assign the default.
-    if (zkProperties.getProperty(HConstants.CLIENT_PORT_STR) == null) {
-      zkProperties.put(HConstants.CLIENT_PORT_STR,
-          HConstants.DEFAULT_ZOOKEPER_CLIENT_PORT);
-    }
-
-    // Create the server.X properties.
-    int peerPort = conf.getInt("hbase.zookeeper.peerport", 2888);
-    int leaderPort = conf.getInt("hbase.zookeeper.leaderport", 3888);
-
-    final String[] serverHosts = conf.getStrings(HConstants.ZOOKEEPER_QUORUM,
-                                                 HConstants.LOCALHOST);
-    String serverHost;
-    String address;
-    String key;
-    for (int i = 0; i < serverHosts.length; ++i) {
-      if (serverHosts[i].contains(":")) {
-        serverHost = serverHosts[i].substring(0, serverHosts[i].indexOf(':'));
-      } else {
-        serverHost = serverHosts[i];
-      }
-      address = serverHost + ":" + peerPort + ":" + leaderPort;
-      key = "server." + i;
-      zkProperties.put(key, address);
-    }
-
-    return zkProperties;
-  }
-
-  /**
-   * Parse ZooKeeper's zoo.cfg, injecting HBase Configuration variables in.
-   * This method is used for testing so we can pass our own InputStream.
-   * @param conf HBaseConfiguration to use for injecting variables.
-   * @param inputStream InputStream to read from.
-   * @return Properties parsed from config stream with variables substituted.
-   * @throws IOException if anything goes wrong parsing config
-   * @deprecated in 0.96 onwards. HBase will no longer rely on zoo.cfg
-   * availability.
-   */
-  @Deprecated
-  public static Properties parseZooCfg(Configuration conf,
-      InputStream inputStream) throws IOException {
-    Properties properties = new Properties();
-    try {
-      properties.load(inputStream);
-    } catch (IOException e) {
-      final String msg = "fail to read properties from "
-        + HConstants.ZOOKEEPER_CONFIG_NAME;
-      LOG.fatal(msg);
-      throw new IOException(msg, e);
-    }
-    for (Entry<Object, Object> entry : properties.entrySet()) {
-      String value = entry.getValue().toString().trim();
-      String key = entry.getKey().toString().trim();
-      StringBuilder newValue = new StringBuilder();
-      int varStart = value.indexOf(VARIABLE_START);
-      int varEnd = 0;
-      while (varStart != -1) {
-        varEnd = value.indexOf(VARIABLE_END, varStart);
-        if (varEnd == -1) {
-          String msg = "variable at " + varStart + " has no end marker";
-          LOG.fatal(msg);
-          throw new IOException(msg);
-        }
-        String variable = value.substring(varStart + VARIABLE_START_LENGTH, varEnd);
-
-        String substituteValue = System.getProperty(variable);
-        if (substituteValue == null) {
-          substituteValue = conf.get(variable);
-        }
-        if (substituteValue == null) {
-          String msg = "variable " + variable + " not set in system property "
-                     + "or hbase configs";
-          LOG.fatal(msg);
-          throw new IOException(msg);
-        }
-
-        newValue.append(substituteValue);
-
-        varEnd += VARIABLE_END_LENGTH;
-        varStart = value.indexOf(VARIABLE_START, varEnd);
-      }
-      // Special case for 'hbase.cluster.distributed' property being 'true'
-      if (key.startsWith("server.")) {
-        boolean mode =
-            conf.getBoolean(HConstants.CLUSTER_DISTRIBUTED, HConstants.DEFAULT_CLUSTER_DISTRIBUTED);
-        if (mode == HConstants.CLUSTER_IS_DISTRIBUTED && value.startsWith(HConstants.LOCALHOST)) {
-          String msg = "The server in zoo.cfg cannot be set to localhost " +
-              "in a fully-distributed setup because it won't be reachable. " +
-              "See \"Getting Started\" for more information.";
-          LOG.fatal(msg);
-          throw new IOException(msg);
-        }
-      }
-      newValue.append(value.substring(varEnd));
-      properties.setProperty(key, newValue.toString());
-    }
-    return properties;
-  }
-
-  /**
-   * Return the ZK Quorum servers string given zk properties returned by
-   * makeZKProps
-   * @param properties
-   * @return Quorum servers String
-   */
-  private static String getZKQuorumServersString(Properties properties) {
-    String clientPort = null;
-    List<String> servers = new ArrayList<String>();
-
-    // The clientPort option may come after the server.X hosts, so we need to
-    // grab everything and then create the final host:port comma separated list.
-    boolean anyValid = false;
-    for (Entry<Object,Object> property : properties.entrySet()) {
-      String key = property.getKey().toString().trim();
-      String value = property.getValue().toString().trim();
-      if (key.equals("clientPort")) {
-        clientPort = value;
-      }
-      else if (key.startsWith("server.")) {
-        String host = value.substring(0, value.indexOf(':'));
-        servers.add(host);
-        anyValid = true;
-      }
-    }
-
-    if (!anyValid) {
-      LOG.error("no valid quorum servers found in " + HConstants.ZOOKEEPER_CONFIG_NAME);
-      return null;
-    }
-
-    if (clientPort == null) {
-      LOG.error("no clientPort found in " + HConstants.ZOOKEEPER_CONFIG_NAME);
-      return null;
-    }
-
-    if (servers.isEmpty()) {
-      LOG.fatal("No servers were found in provided ZooKeeper configuration. " +
-          "HBase must have a ZooKeeper cluster configured for its " +
-          "operation. Ensure that you've configured '" +
-          HConstants.ZOOKEEPER_QUORUM + "' properly.");
-      return null;
-    }
-
-    StringBuilder hostPortBuilder = new StringBuilder();
-    for (int i = 0; i < servers.size(); ++i) {
-      String host = servers.get(i);
-      if (i > 0) {
-        hostPortBuilder.append(',');
-      }
-      hostPortBuilder.append(host);
-      hostPortBuilder.append(':');
-      hostPortBuilder.append(clientPort);
-    }
-
-    return hostPortBuilder.toString();
-  }
-
-  /**
-   * Return the ZK Quorum servers string given the specified configuration
-   *
-   * @param conf
-   * @return Quorum servers String
-   */
-  private static String getZKQuorumServersStringFromHbaseConfig(Configuration conf) {
-    String defaultClientPort = Integer.toString(
-        conf.getInt(HConstants.ZOOKEEPER_CLIENT_PORT, HConstants.DEFAULT_ZOOKEPER_CLIENT_PORT));
-
-    // Build the ZK quorum server string with "server:clientport" list, separated by ','
-    final String[] serverHosts =
-        conf.getStrings(HConstants.ZOOKEEPER_QUORUM, HConstants.LOCALHOST);
-    return buildQuorumServerString(serverHosts, defaultClientPort);
-  }
-
-  /**
-   * Build the ZK quorum server string with "server:clientport" list, separated by ','
-   *
-   * @param serverHosts a list of servers for ZK quorum
-   * @param clientPort the default client port
-   * @return the string for a list of "server:port" separated by ","
-   */
-  public static String buildQuorumServerString(String[] serverHosts, String clientPort) {
-    StringBuilder quorumStringBuilder = new StringBuilder();
-    String serverHost;
-    for (int i = 0; i < serverHosts.length; ++i) {
-      if (serverHosts[i].contains(":")) {
-        serverHost = serverHosts[i]; // just use the port specified from the input
-      } else {
-        serverHost = serverHosts[i] + ":" + clientPort;
-      }
-      if (i > 0) {
-        quorumStringBuilder.append(',');
-      }
-      quorumStringBuilder.append(serverHost);
-    }
-    return quorumStringBuilder.toString();
-  }
-
-  /**
-   * Return the ZK Quorum servers string given the specified configuration.
-   * @param conf
-   * @return Quorum servers
-   */
-  public static String getZKQuorumServersString(Configuration conf) {
-    // First try zoo.cfg; if not applicable, then try config XML.
-    Properties zkProperties = makeZKPropsFromZooCfg(conf);
-
-    if (zkProperties != null) {
-      return getZKQuorumServersString(zkProperties);
-    }
-
-    return getZKQuorumServersStringFromHbaseConfig(conf);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/5dec5ad2/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
index ffbe2db..bf803be 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
@@ -76,7 +76,6 @@ import org.apache.zookeeper.proto.DeleteRequest;
 import org.apache.zookeeper.proto.SetDataRequest;
 import org.apache.zookeeper.server.ZooKeeperSaslServer;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.protobuf.InvalidProtocolBufferException;
 
 /**
@@ -96,25 +95,6 @@ public class ZKUtil {
   public static final char ZNODE_PATH_SEPARATOR = '/';
   private static int zkDumpConnectionTimeOut;
 
-  // The Quorum for the ZK cluster can have one the following format (see examples below):
-  // (1). s1,s2,s3 (no client port in the list, the client port could be obtained from clientPort)
-  // (2). s1:p1,s2:p2,s3:p3 (with client port, which could be same or different for each server,
-  //      in this case, the clientPort would be ignored)
-  // (3). s1:p1,s2,s3:p3 (mix of (1) and (2) - if port is not specified in a server, it would use
-  //      the clientPort; otherwise, it would use the specified port)
-  @VisibleForTesting
-  public static class ZKClusterKey {
-    public String quorumString;
-    public int clientPort;
-    public String znodeParent;
-
-    ZKClusterKey(String quorumString, int clientPort, String znodeParent) {
-      this.quorumString = quorumString;
-      this.clientPort = clientPort;
-      this.znodeParent = znodeParent;
-    }
-  }
-
   /**
    * Creates a new connection to ZooKeeper, pulling settings and ensemble config
    * from the specified configuration object using methods from {@link ZKConfig}.
@@ -365,110 +345,6 @@ public class ZKUtil {
     return path.substring(path.lastIndexOf("/")+1);
   }
 
-  /**
-   * Get the key to the ZK ensemble for this configuration without
-   * adding a name at the end
-   * @param conf Configuration to use to build the key
-   * @return ensemble key without a name
-   */
-  public static String getZooKeeperClusterKey(Configuration conf) {
-    return getZooKeeperClusterKey(conf, null);
-  }
-
-  /**
-   * Get the key to the ZK ensemble for this configuration and append
-   * a name at the end
-   * @param conf Configuration to use to build the key
-   * @param name Name that should be appended at the end if not empty or null
-   * @return ensemble key with a name (if any)
-   */
-  public static String getZooKeeperClusterKey(Configuration conf, String name) {
-    String ensemble = conf.get(HConstants.ZOOKEEPER_QUORUM).replaceAll(
-        "[\\t\\n\\x0B\\f\\r]", "");
-    StringBuilder builder = new StringBuilder(ensemble);
-    builder.append(":");
-    builder.append(conf.get(HConstants.ZOOKEEPER_CLIENT_PORT));
-    builder.append(":");
-    builder.append(conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT));
-    if (name != null && !name.isEmpty()) {
-      builder.append(",");
-      builder.append(name);
-    }
-    return builder.toString();
-  }
-
-  /**
-   * Apply the settings in the given key to the given configuration, this is
-   * used to communicate with distant clusters
-   * @param conf configuration object to configure
-   * @param key string that contains the 3 required configuratins
-   * @throws IOException
-   */
-  public static void applyClusterKeyToConf(Configuration conf, String key)
-      throws IOException{
-    ZKClusterKey zkClusterKey = transformClusterKey(key);
-    conf.set(HConstants.ZOOKEEPER_QUORUM, zkClusterKey.quorumString);
-    conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, zkClusterKey.clientPort);
-    conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, zkClusterKey.znodeParent);
-  }
-
-  /**
-   * Separate the given key into the three configurations it should contain:
-   * hbase.zookeeper.quorum, hbase.zookeeper.client.port
-   * and zookeeper.znode.parent
-   * @param key
-   * @return the three configuration in the described order
-   * @throws IOException
-   */
-  public static ZKClusterKey transformClusterKey(String key) throws IOException {
-    String[] parts = key.split(":");
-
-    if (parts.length == 3) {
-      return new ZKClusterKey(parts [0], Integer.parseInt(parts [1]), parts [2]);
-    }
-
-    if (parts.length > 3) {
-      // The quorum could contain client port in server:clientport format, try to transform more.
-      String zNodeParent = parts [parts.length - 1];
-      String clientPort = parts [parts.length - 2];
-
-      // The first part length is the total length minus the lengths of other parts and minus 2 ":"
-      int endQuorumIndex = key.length() - zNodeParent.length() - clientPort.length() - 2;
-      String quorumStringInput = key.substring(0, endQuorumIndex);
-      String[] serverHosts = quorumStringInput.split(",");
-
-      // The common case is that every server has its own client port specified - this means
-      // that (total parts - the ZNodeParent part - the ClientPort part) is equal to
-      // (the number of "," + 1) - "+ 1" because the last server has no ",".
-      if ((parts.length - 2) == (serverHosts.length + 1)) {
-        return new ZKClusterKey(quorumStringInput, Integer.parseInt(clientPort), zNodeParent);
-      }
-
-      // For the uncommon case that some servers has no port specified, we need to build the
-      // server:clientport list using default client port for servers without specified port.
-      return new ZKClusterKey(
-        ZKConfig.buildQuorumServerString(serverHosts, clientPort),
-        Integer.parseInt(clientPort),
-        zNodeParent);
-    }
-
-    throw new IOException("Cluster key passed " + key + " is invalid, the format should be:" +
-          HConstants.ZOOKEEPER_QUORUM + ":" + HConstants.ZOOKEEPER_CLIENT_PORT + ":"
-          + HConstants.ZOOKEEPER_ZNODE_PARENT);
-  }
-
-  /**
-   * Standardize the ZK quorum string: make it a "server:clientport" list, separated by ','
-   * @param quorumStringInput a string contains a list of servers for ZK quorum
-   * @param clientPort the default client port
-   * @return the string for a list of "server:port" separated by ","
-   */
-  @VisibleForTesting
-  public static String standardizeQuorumServerString(String quorumStringInput, String clientPort) {
-    String[] serverHosts = quorumStringInput.split(",");
-    return ZKConfig.buildQuorumServerString(serverHosts, clientPort);
-  }
-
   //
   // Existence checks and watches
   //

http://git-wip-us.apache.org/repos/asf/hbase/blob/5dec5ad2/hbase-client/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKUtil.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKUtil.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKUtil.java
index 72de935..eb629f2 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKUtil.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKUtil.java
@@ -41,17 +41,6 @@ import org.junit.experimental.categories.Category;
 public class TestZKUtil {
 
   @Test
-  public void testGetZooKeeperClusterKey() {
-    Configuration conf = HBaseConfiguration.create();
-    conf.set(HConstants.ZOOKEEPER_QUORUM, "\tlocalhost\n");
-    conf.set(HConstants.ZOOKEEPER_CLIENT_PORT, "3333");
-    conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "hbase");
-    String clusterKey = ZKUtil.getZooKeeperClusterKey(conf, "test");
-    Assert.assertTrue(!clusterKey.contains("\t") && !clusterKey.contains("\n"));
-    Assert.assertEquals("localhost:3333:hbase,test", clusterKey);
-  }
-  
-  @Test
   public void testCreateACL() throws ZooKeeperConnectionException, IOException {
     Configuration conf = HBaseConfiguration.create();
     conf.set(Superusers.SUPERUSER_CONF_KEY, "user1,@group1,user2,@group2,user3");

http://git-wip-us.apache.org/repos/asf/hbase/blob/5dec5ad2/hbase-common/src/main/java/org/apache/hadoop/hbase/HBaseConfiguration.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HBaseConfiguration.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HBaseConfiguration.java
index 505912e..94d4483 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HBaseConfiguration.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HBaseConfiguration.java
@@ -20,15 +20,16 @@ package org.apache.hadoop.hbase;
 import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
-import java.util.Map.Entry;
+import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.io.util.HeapMemorySizeUtil;
 import org.apache.hadoop.hbase.util.VersionInfo;
+import org.apache.hadoop.hbase.zookeeper.ZKConfig;
 
 /**
  * Adds HBase configuration files to a Configuration
@@ -113,7 +114,7 @@ public class HBaseConfiguration extends Configuration {
    * @param srcConf the source configuration
    **/
   public static void merge(Configuration destConf, Configuration srcConf) {
-    for (Entry<String, String> e : srcConf) {
+    for (Map.Entry<String, String> e : srcConf) {
       destConf.set(e.getKey(), e.getValue());
     }
   }
@@ -127,7 +128,7 @@ public class HBaseConfiguration extends Configuration {
    */
   public static Configuration subset(Configuration srcConf, String prefix) {
     Configuration newConf = new Configuration(false);
-    for (Entry<String, String> entry : srcConf) {
+    for (Map.Entry<String, String> entry : srcConf) {
       if (entry.getKey().startsWith(prefix)) {
         String newKey = entry.getKey().substring(prefix.length());
         // avoid entries that would produce an empty key
@@ -140,6 +141,18 @@ public class HBaseConfiguration extends Configuration {
   }
 
   /**
+   * Sets all the entries in the provided {@code Map<String, String>} as properties in the
+   * given {@code Configuration}.  Each property will have the specified prefix prepended,
+   * so that the configuration entries are keyed by {@code prefix + entry.getKey()}.
+   */
+  public static void setWithPrefix(Configuration conf, String prefix,
+                                   Iterable<Map.Entry<String, String>> properties) {
+    for (Map.Entry<String, String> entry : properties) {
+      conf.set(prefix + entry.getKey(), entry.getValue());
+    }
+  }
+
+  /**
    * @return whether to show HBase Configuration in servlet
    */
   public static boolean isShowConfInServlet() {
@@ -233,7 +246,67 @@ public class HBaseConfiguration extends Configuration {
     return passwd;
   }
 
-  /** For debugging.  Dump configurations to system output as xml format.
+  /**
+   * Generates a {@link Configuration} instance by applying the ZooKeeper cluster key
+   * to the base Configuration.  Note that additional configuration properties may be needed
+   * for a remote cluster, so it is preferable to use
+   * {@link #createClusterConf(Configuration, String, String)}.
+   *
+   * @param baseConf the base configuration to use, containing prefixed override properties
+   * @param clusterKey the ZooKeeper quorum cluster key to apply, or {@code null} if none
+   *
+   * @return the merged configuration with override properties and cluster key applied
+   *
+   * @see #createClusterConf(Configuration, String, String)
+   */
+  public static Configuration createClusterConf(Configuration baseConf, String clusterKey)
+      throws IOException {
+    return createClusterConf(baseConf, clusterKey, null);
+  }
+
+  /**
+   * Generates a {@link Configuration} instance by applying property overrides prefixed by
+   * a cluster profile key to the base Configuration.  Override properties are extracted by
+   * the {@link #subset(Configuration, String)} method, then the merged on top of the base
+   * Configuration and returned.
+   *
+   * @param baseConf the base configuration to use, containing prefixed override properties
+   * @param clusterKey the ZooKeeper quorum cluster key to apply, or {@code null} if none
+   * @param overridePrefix the property key prefix to match for override properties,
+   *     or {@code null} if none
+   * @return the merged configuration with override properties and cluster key applied
+   */
+  public static Configuration createClusterConf(Configuration baseConf, String clusterKey,
+                                                String overridePrefix) throws IOException {
+    Configuration clusterConf = HBaseConfiguration.create(baseConf);
+    if (clusterKey != null && !clusterKey.isEmpty()) {
+      applyClusterKeyToConf(clusterConf, clusterKey);
+    }
+
+    if (overridePrefix != null && !overridePrefix.isEmpty()) {
+      Configuration clusterSubset = HBaseConfiguration.subset(clusterConf, overridePrefix);
+      HBaseConfiguration.merge(clusterConf, clusterSubset);
+    }
+    return clusterConf;
+  }
+
+  /**
+   * Apply the settings in the given key to the given configuration, this is
+   * used to communicate with distant clusters
+   * @param conf configuration object to configure
+   * @param key string that contains the 3 required configuratins
+   * @throws IOException
+   */
+  private static void applyClusterKeyToConf(Configuration conf, String key)
+      throws IOException{
+    ZKConfig.ZKClusterKey zkClusterKey = ZKConfig.transformClusterKey(key);
+    conf.set(HConstants.ZOOKEEPER_QUORUM, zkClusterKey.getQuorumString());
+    conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, zkClusterKey.getClientPort());
+    conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, zkClusterKey.getZnodeParent());
+  }
+
+  /**
+   * For debugging.  Dump configurations to system output as xml format.
    * Master and RS configurations can also be dumped using
    * http services. e.g. "curl http://master:16010/dump"
    */

http://git-wip-us.apache.org/repos/asf/hbase/blob/5dec5ad2/hbase-common/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKConfig.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKConfig.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKConfig.java
new file mode 100644
index 0000000..787b5cc
--- /dev/null
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKConfig.java
@@ -0,0 +1,495 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.zookeeper;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Properties;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * Utility methods for reading, and building the ZooKeeper configuration.
+ *
+ * The order and priority for reading the config are as follows:
+ * (1). zoo.cfg if ""hbase.config.read.zookeeper.config" is true
+ * (2). Property with "hbase.zookeeper.property." prefix from HBase XML
+ * (3). other zookeeper related properties in HBASE XML
+ */
+@InterfaceAudience.Private
+public final class ZKConfig {
+  private static final Log LOG = LogFactory.getLog(ZKConfig.class);
+
+  private static final String VARIABLE_START = "${";
+  private static final int VARIABLE_START_LENGTH = VARIABLE_START.length();
+  private static final String VARIABLE_END = "}";
+  private static final int VARIABLE_END_LENGTH = VARIABLE_END.length();
+
+  private ZKConfig() {
+  }
+
+  /**
+   * Make a Properties object holding ZooKeeper config.
+   * Parses the corresponding config options from the HBase XML configs
+   * and generates the appropriate ZooKeeper properties.
+   * @param conf Configuration to read from.
+   * @return Properties holding mappings representing ZooKeeper config file.
+   */
+  public static Properties makeZKProps(Configuration conf) {
+    Properties zkProperties = makeZKPropsFromZooCfg(conf);
+
+    if (zkProperties == null) {
+      // Otherwise, use the configuration options from HBase's XML files.
+      zkProperties = makeZKPropsFromHbaseConfig(conf);
+    }
+    return zkProperties;
+  }
+
+  /**
+   * Parses the corresponding config options from the zoo.cfg file
+   * and make a Properties object holding the Zookeeper config.
+   *
+   * @param conf Configuration to read from.
+   * @return Properties holding mappings representing the ZooKeeper config file or null if
+   * the HBASE_CONFIG_READ_ZOOKEEPER_CONFIG is false or the file does not exist.
+   */
+  private static Properties makeZKPropsFromZooCfg(Configuration conf) {
+    if (conf.getBoolean(HConstants.HBASE_CONFIG_READ_ZOOKEEPER_CONFIG, false)) {
+      LOG.warn(
+          "Parsing ZooKeeper's " + HConstants.ZOOKEEPER_CONFIG_NAME +
+          " file for ZK properties " +
+          "has been deprecated. Please instead place all ZK related HBase " +
+          "configuration under the hbase-site.xml, using prefixes " +
+          "of the form '" + HConstants.ZK_CFG_PROPERTY_PREFIX + "', and " +
+          "set property '" + HConstants.HBASE_CONFIG_READ_ZOOKEEPER_CONFIG +
+          "' to false");
+      // First check if there is a zoo.cfg in the CLASSPATH. If so, simply read
+      // it and grab its configuration properties.
+      ClassLoader cl = ZKConfig.class.getClassLoader();
+      final InputStream inputStream =
+        cl.getResourceAsStream(HConstants.ZOOKEEPER_CONFIG_NAME);
+      if (inputStream != null) {
+        try {
+          return parseZooCfg(conf, inputStream);
+        } catch (IOException e) {
+          LOG.warn("Cannot read " + HConstants.ZOOKEEPER_CONFIG_NAME +
+                   ", loading from XML files", e);
+        }
+      }
+    } else {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Skipped reading ZK properties file '" + HConstants.ZOOKEEPER_CONFIG_NAME +
+          "' since '" + HConstants.HBASE_CONFIG_READ_ZOOKEEPER_CONFIG + "' was not set to true");
+      }
+    }
+
+    return null;
+  }
+
+  /**
+   * Make a Properties object holding ZooKeeper config.
+   * Parses the corresponding config options from the HBase XML configs
+   * and generates the appropriate ZooKeeper properties.
+   *
+   * @param conf Configuration to read from.
+   * @return Properties holding mappings representing ZooKeeper config file.
+   */
+  private static Properties makeZKPropsFromHbaseConfig(Configuration conf) {
+    Properties zkProperties = new Properties();
+
+    // Directly map all of the hbase.zookeeper.property.KEY properties.
+    // Synchronize on conf so no loading of configs while we iterate
+    synchronized (conf) {
+      for (Entry<String, String> entry : conf) {
+        String key = entry.getKey();
+        if (key.startsWith(HConstants.ZK_CFG_PROPERTY_PREFIX)) {
+          String zkKey = key.substring(HConstants.ZK_CFG_PROPERTY_PREFIX_LEN);
+          String value = entry.getValue();
+          // If the value has variables substitutions, need to do a get.
+          if (value.contains(VARIABLE_START)) {
+            value = conf.get(key);
+          }
+          zkProperties.put(zkKey, value);
+        }
+      }
+    }
+
+    // If clientPort is not set, assign the default.
+    if (zkProperties.getProperty(HConstants.CLIENT_PORT_STR) == null) {
+      zkProperties.put(HConstants.CLIENT_PORT_STR,
+          HConstants.DEFAULT_ZOOKEPER_CLIENT_PORT);
+    }
+
+    // Create the server.X properties.
+    int peerPort = conf.getInt("hbase.zookeeper.peerport", 2888);
+    int leaderPort = conf.getInt("hbase.zookeeper.leaderport", 3888);
+
+    final String[] serverHosts = conf.getStrings(HConstants.ZOOKEEPER_QUORUM,
+                                                 HConstants.LOCALHOST);
+    String serverHost;
+    String address;
+    String key;
+    for (int i = 0; i < serverHosts.length; ++i) {
+      if (serverHosts[i].contains(":")) {
+        serverHost = serverHosts[i].substring(0, serverHosts[i].indexOf(':'));
+      } else {
+        serverHost = serverHosts[i];
+      }
+      address = serverHost + ":" + peerPort + ":" + leaderPort;
+      key = "server." + i;
+      zkProperties.put(key, address);
+    }
+
+    return zkProperties;
+  }
+
+  /**
+   * Parse ZooKeeper's zoo.cfg, injecting HBase Configuration variables in.
+   * This method is used for testing so we can pass our own InputStream.
+   * @param conf HBaseConfiguration to use for injecting variables.
+   * @param inputStream InputStream to read from.
+   * @return Properties parsed from config stream with variables substituted.
+   * @throws IOException if anything goes wrong parsing config
+   * @deprecated in 0.96 onwards. HBase will no longer rely on zoo.cfg
+   * availability.
+   */
+  @Deprecated
+  public static Properties parseZooCfg(Configuration conf,
+      InputStream inputStream) throws IOException {
+    Properties properties = new Properties();
+    try {
+      properties.load(inputStream);
+    } catch (IOException e) {
+      final String msg = "fail to read properties from "
+        + HConstants.ZOOKEEPER_CONFIG_NAME;
+      LOG.fatal(msg);
+      throw new IOException(msg, e);
+    }
+    for (Entry<Object, Object> entry : properties.entrySet()) {
+      String value = entry.getValue().toString().trim();
+      String key = entry.getKey().toString().trim();
+      StringBuilder newValue = new StringBuilder();
+      int varStart = value.indexOf(VARIABLE_START);
+      int varEnd = 0;
+      while (varStart != -1) {
+        varEnd = value.indexOf(VARIABLE_END, varStart);
+        if (varEnd == -1) {
+          String msg = "variable at " + varStart + " has no end marker";
+          LOG.fatal(msg);
+          throw new IOException(msg);
+        }
+        String variable = value.substring(varStart + VARIABLE_START_LENGTH, varEnd);
+
+        String substituteValue = System.getProperty(variable);
+        if (substituteValue == null) {
+          substituteValue = conf.get(variable);
+        }
+        if (substituteValue == null) {
+          String msg = "variable " + variable + " not set in system property "
+                     + "or hbase configs";
+          LOG.fatal(msg);
+          throw new IOException(msg);
+        }
+
+        newValue.append(substituteValue);
+
+        varEnd += VARIABLE_END_LENGTH;
+        varStart = value.indexOf(VARIABLE_START, varEnd);
+      }
+      // Special case for 'hbase.cluster.distributed' property being 'true'
+      if (key.startsWith("server.")) {
+        boolean mode =
+            conf.getBoolean(HConstants.CLUSTER_DISTRIBUTED, HConstants.DEFAULT_CLUSTER_DISTRIBUTED);
+        if (mode == HConstants.CLUSTER_IS_DISTRIBUTED && value.startsWith(HConstants.LOCALHOST)) {
+          String msg = "The server in zoo.cfg cannot be set to localhost " +
+              "in a fully-distributed setup because it won't be reachable. " +
+              "See \"Getting Started\" for more information.";
+          LOG.fatal(msg);
+          throw new IOException(msg);
+        }
+      }
+      newValue.append(value.substring(varEnd));
+      properties.setProperty(key, newValue.toString());
+    }
+    return properties;
+  }
+
+  /**
+   * Return the ZK Quorum servers string given zk properties returned by
+   * makeZKProps
+   * @param properties
+   * @return Quorum servers String
+   */
+  private static String getZKQuorumServersString(Properties properties) {
+    String clientPort = null;
+    List<String> servers = new ArrayList<String>();
+
+    // The clientPort option may come after the server.X hosts, so we need to
+    // grab everything and then create the final host:port comma separated list.
+    boolean anyValid = false;
+    for (Entry<Object,Object> property : properties.entrySet()) {
+      String key = property.getKey().toString().trim();
+      String value = property.getValue().toString().trim();
+      if (key.equals("clientPort")) {
+        clientPort = value;
+      }
+      else if (key.startsWith("server.")) {
+        String host = value.substring(0, value.indexOf(':'));
+        servers.add(host);
+        anyValid = true;
+      }
+    }
+
+    if (!anyValid) {
+      LOG.error("no valid quorum servers found in " + HConstants.ZOOKEEPER_CONFIG_NAME);
+      return null;
+    }
+
+    if (clientPort == null) {
+      LOG.error("no clientPort found in " + HConstants.ZOOKEEPER_CONFIG_NAME);
+      return null;
+    }
+
+    if (servers.isEmpty()) {
+      LOG.fatal("No servers were found in provided ZooKeeper configuration. " +
+          "HBase must have a ZooKeeper cluster configured for its " +
+          "operation. Ensure that you've configured '" +
+          HConstants.ZOOKEEPER_QUORUM + "' properly.");
+      return null;
+    }
+
+    StringBuilder hostPortBuilder = new StringBuilder();
+    for (int i = 0; i < servers.size(); ++i) {
+      String host = servers.get(i);
+      if (i > 0) {
+        hostPortBuilder.append(',');
+      }
+      hostPortBuilder.append(host);
+      hostPortBuilder.append(':');
+      hostPortBuilder.append(clientPort);
+    }
+
+    return hostPortBuilder.toString();
+  }
+
+  /**
+   * Return the ZK Quorum servers string given the specified configuration
+   *
+   * @param conf
+   * @return Quorum servers String
+   */
+  private static String getZKQuorumServersStringFromHbaseConfig(Configuration conf) {
+    String defaultClientPort = Integer.toString(
+        conf.getInt(HConstants.ZOOKEEPER_CLIENT_PORT, HConstants.DEFAULT_ZOOKEPER_CLIENT_PORT));
+
+    // Build the ZK quorum server string with "server:clientport" list, separated by ','
+    final String[] serverHosts =
+        conf.getStrings(HConstants.ZOOKEEPER_QUORUM, HConstants.LOCALHOST);
+    return buildZKQuorumServerString(serverHosts, defaultClientPort);
+  }
+
+  /**
+   * Return the ZK Quorum servers string given the specified configuration.
+   * @param conf
+   * @return Quorum servers
+   */
+  public static String getZKQuorumServersString(Configuration conf) {
+    // First try zoo.cfg; if not applicable, then try config XML.
+    Properties zkProperties = makeZKPropsFromZooCfg(conf);
+
+    if (zkProperties != null) {
+      return getZKQuorumServersString(zkProperties);
+    }
+
+    return getZKQuorumServersStringFromHbaseConfig(conf);
+  }
+
+  /**
+   * Build the ZK quorum server string with "server:clientport" list, separated by ','
+   *
+   * @param serverHosts a list of servers for ZK quorum
+   * @param clientPort the default client port
+   * @return the string for a list of "server:port" separated by ","
+   */
+  public static String buildZKQuorumServerString(String[] serverHosts, String clientPort) {
+    StringBuilder quorumStringBuilder = new StringBuilder();
+    String serverHost;
+    for (int i = 0; i < serverHosts.length; ++i) {
+      if (serverHosts[i].contains(":")) {
+        serverHost = serverHosts[i]; // just use the port specified from the input
+      } else {
+        serverHost = serverHosts[i] + ":" + clientPort;
+      }
+      if (i > 0) {
+        quorumStringBuilder.append(',');
+      }
+      quorumStringBuilder.append(serverHost);
+    }
+    return quorumStringBuilder.toString();
+  }
+
+  /**
+   * Verifies that the given key matches the expected format for a ZooKeeper cluster key.
+   * The Quorum for the ZK cluster can have one the following formats (see examples below):
+   *
+   * <ol>
+   *   <li>s1,s2,s3 (no client port in the list, the client port could be obtained from
+   *       clientPort)</li>
+   *   <li>s1:p1,s2:p2,s3:p3 (with client port, which could be same or different for each server,
+   *       in this case, the clientPort would be ignored)</li>
+   *   <li>s1:p1,s2,s3:p3 (mix of (1) and (2) - if port is not specified in a server, it would use
+   *       the clientPort; otherwise, it would use the specified port)</li>
+   * </ol>
+   *
+   * @param key the cluster key to validate
+   * @throws IOException if the key could not be parsed
+   */
+  public static void validateClusterKey(String key) throws IOException {
+    transformClusterKey(key);
+  }
+
+  /**
+   * Separate the given key into the three configurations it should contain:
+   * hbase.zookeeper.quorum, hbase.zookeeper.client.port
+   * and zookeeper.znode.parent
+   * @param key
+   * @return the three configuration in the described order
+   * @throws IOException
+   */
+  public static ZKClusterKey transformClusterKey(String key) throws IOException {
+    String[] parts = key.split(":");
+
+    if (parts.length == 3) {
+      return new ZKClusterKey(parts [0], Integer.parseInt(parts [1]), parts [2]);
+    }
+
+    if (parts.length > 3) {
+      // The quorum could contain client port in server:clientport format, try to transform more.
+      String zNodeParent = parts [parts.length - 1];
+      String clientPort = parts [parts.length - 2];
+
+      // The first part length is the total length minus the lengths of other parts and minus 2 ":"
+      int endQuorumIndex = key.length() - zNodeParent.length() - clientPort.length() - 2;
+      String quorumStringInput = key.substring(0, endQuorumIndex);
+      String[] serverHosts = quorumStringInput.split(",");
+
+      // The common case is that every server has its own client port specified - this means
+      // that (total parts - the ZNodeParent part - the ClientPort part) is equal to
+      // (the number of "," + 1) - "+ 1" because the last server has no ",".
+      if ((parts.length - 2) == (serverHosts.length + 1)) {
+        return new ZKClusterKey(quorumStringInput, Integer.parseInt(clientPort), zNodeParent);
+      }
+
+      // For the uncommon case that some servers has no port specified, we need to build the
+      // server:clientport list using default client port for servers without specified port.
+      return new ZKClusterKey(
+          buildZKQuorumServerString(serverHosts, clientPort),
+          Integer.parseInt(clientPort),
+          zNodeParent);
+    }
+
+    throw new IOException("Cluster key passed " + key + " is invalid, the format should be:" +
+        HConstants.ZOOKEEPER_QUORUM + ":" + HConstants.ZOOKEEPER_CLIENT_PORT + ":"
+        + HConstants.ZOOKEEPER_ZNODE_PARENT);
+  }
+
+  /**
+   * Get the key to the ZK ensemble for this configuration without
+   * adding a name at the end
+   * @param conf Configuration to use to build the key
+   * @return ensemble key without a name
+   */
+  public static String getZooKeeperClusterKey(Configuration conf) {
+    return getZooKeeperClusterKey(conf, null);
+  }
+
+  /**
+   * Get the key to the ZK ensemble for this configuration and append
+   * a name at the end
+   * @param conf Configuration to use to build the key
+   * @param name Name that should be appended at the end if not empty or null
+   * @return ensemble key with a name (if any)
+   */
+  public static String getZooKeeperClusterKey(Configuration conf, String name) {
+    String ensemble = conf.get(HConstants.ZOOKEEPER_QUORUM).replaceAll(
+        "[\\t\\n\\x0B\\f\\r]", "");
+    StringBuilder builder = new StringBuilder(ensemble);
+    builder.append(":");
+    builder.append(conf.get(HConstants.ZOOKEEPER_CLIENT_PORT));
+    builder.append(":");
+    builder.append(conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT));
+    if (name != null && !name.isEmpty()) {
+      builder.append(",");
+      builder.append(name);
+    }
+    return builder.toString();
+  }
+
+  /**
+   * Standardize the ZK quorum string: make it a "server:clientport" list, separated by ','
+   * @param quorumStringInput a string contains a list of servers for ZK quorum
+   * @param clientPort the default client port
+   * @return the string for a list of "server:port" separated by ","
+   */
+  @VisibleForTesting
+  public static String standardizeZKQuorumServerString(String quorumStringInput,
+      String clientPort) {
+    String[] serverHosts = quorumStringInput.split(",");
+    return buildZKQuorumServerString(serverHosts, clientPort);
+  }
+
+  // The Quorum for the ZK cluster can have one the following format (see examples below):
+  // (1). s1,s2,s3 (no client port in the list, the client port could be obtained from clientPort)
+  // (2). s1:p1,s2:p2,s3:p3 (with client port, which could be same or different for each server,
+  //      in this case, the clientPort would be ignored)
+  // (3). s1:p1,s2,s3:p3 (mix of (1) and (2) - if port is not specified in a server, it would use
+  //      the clientPort; otherwise, it would use the specified port)
+  @VisibleForTesting
+  public static class ZKClusterKey {
+    private String quorumString;
+    private int clientPort;
+    private String znodeParent;
+
+    ZKClusterKey(String quorumString, int clientPort, String znodeParent) {
+      this.quorumString = quorumString;
+      this.clientPort = clientPort;
+      this.znodeParent = znodeParent;
+    }
+
+    public String getQuorumString() {
+      return quorumString;
+    }
+
+    public int getClientPort() {
+      return clientPort;
+    }
+
+    public String getZnodeParent() {
+      return znodeParent;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/5dec5ad2/hbase-common/src/test/java/org/apache/hadoop/hbase/TestHBaseConfiguration.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/TestHBaseConfiguration.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/TestHBaseConfiguration.java
index bbddb60..f8b60fd 100644
--- a/hbase-common/src/test/java/org/apache/hadoop/hbase/TestHBaseConfiguration.java
+++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/TestHBaseConfiguration.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import java.io.IOException;
@@ -27,10 +28,12 @@ import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.util.List;
 
+import com.google.common.collect.ImmutableMap;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.zookeeper.ZKConfig;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
@@ -72,8 +75,11 @@ public class TestHBaseConfiguration {
     String prefix = "hbase.mapred.output.";
     conf.set("hbase.security.authentication", "kerberos");
     conf.set("hbase.regionserver.kerberos.principal", "hbasesource");
-    conf.set(prefix + "hbase.regionserver.kerberos.principal", "hbasedest");
-    conf.set(prefix, "shouldbemissing");
+    HBaseConfiguration.setWithPrefix(conf, prefix,
+        ImmutableMap.of(
+            "hbase.regionserver.kerberos.principal", "hbasedest",
+            "", "shouldbemissing")
+            .entrySet());
 
     Configuration subsetConf = HBaseConfiguration.subset(conf, prefix);
     assertNull(subsetConf.get(prefix + "hbase.regionserver.kerberos.principal"));

http://git-wip-us.apache.org/repos/asf/hbase/blob/5dec5ad2/hbase-common/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKConfig.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKConfig.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKConfig.java
new file mode 100644
index 0000000..7879aea
--- /dev/null
+++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKConfig.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.zookeeper;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.testclassification.MiscTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+@Category({MiscTests.class, SmallTests.class})
+public class TestZKConfig {
+
+  @Test
+  public void testZKConfigLoading() throws Exception {
+    Configuration conf = HBaseConfiguration.create();
+    // Test that we read only from the config instance
+    // (i.e. via hbase-default.xml and hbase-site.xml)
+    conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, 2181);
+    Properties props = ZKConfig.makeZKProps(conf);
+    assertEquals("Property client port should have been default from the HBase config",
+                        "2181",
+                        props.getProperty("clientPort"));
+  }
+
+  @Test
+  public void testGetZooKeeperClusterKey() {
+    Configuration conf = HBaseConfiguration.create();
+    conf.set(HConstants.ZOOKEEPER_QUORUM, "\tlocalhost\n");
+    conf.set(HConstants.ZOOKEEPER_CLIENT_PORT, "3333");
+    conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "hbase");
+    String clusterKey = ZKConfig.getZooKeeperClusterKey(conf, "test");
+    assertTrue(!clusterKey.contains("\t") && !clusterKey.contains("\n"));
+    assertEquals("localhost:3333:hbase,test", clusterKey);
+  }
+
+  @Test
+  public void testClusterKey() throws Exception {
+    testKey("server", 2181, "hbase");
+    testKey("server1,server2,server3", 2181, "hbase");
+    try {
+      ZKConfig.validateClusterKey("2181:hbase");
+    } catch (IOException ex) {
+      // OK
+    }
+  }
+
+  @Test
+  public void testClusterKeyWithMultiplePorts() throws Exception {
+    // server has different port than the default port
+    testKey("server1:2182", 2181, "hbase", true);
+    // multiple servers have their own port
+    testKey("server1:2182,server2:2183,server3:2184", 2181, "hbase", true);
+    // one server has no specified port, should use default port
+    testKey("server1:2182,server2,server3:2184", 2181, "hbase", true);
+    // the last server has no specified port, should use default port
+    testKey("server1:2182,server2:2183,server3", 2181, "hbase", true);
+    // multiple servers have no specified port, should use default port for those servers
+    testKey("server1:2182,server2,server3:2184,server4", 2181, "hbase", true);
+    // same server, different ports
+    testKey("server1:2182,server1:2183,server1", 2181, "hbase", true);
+    // mix of same server/different port and different server
+    testKey("server1:2182,server2:2183,server1", 2181, "hbase", true);
+  }
+
+  private void testKey(String ensemble, int port, String znode)
+      throws IOException {
+    testKey(ensemble, port, znode, false); // not support multiple client ports
+  }
+
+  private void testKey(String ensemble, int port, String znode, Boolean multiplePortSupport)
+      throws IOException {
+    Configuration conf = new Configuration();
+    String key = ensemble+":"+port+":"+znode;
+    String ensemble2 = null;
+    ZKConfig.ZKClusterKey zkClusterKey = ZKConfig.transformClusterKey(key);
+    if (multiplePortSupport) {
+      ensemble2 = ZKConfig.standardizeZKQuorumServerString(ensemble,
+          Integer.toString(port));
+      assertEquals(ensemble2, zkClusterKey.getQuorumString());
+    }
+    else {
+      assertEquals(ensemble, zkClusterKey.getQuorumString());
+    }
+    assertEquals(port, zkClusterKey.getClientPort());
+    assertEquals(znode, zkClusterKey.getZnodeParent());
+
+    conf = HBaseConfiguration.createClusterConf(conf, key);
+    assertEquals(zkClusterKey.getQuorumString(), conf.get(HConstants.ZOOKEEPER_QUORUM));
+    assertEquals(zkClusterKey.getClientPort(), conf.getInt(HConstants.ZOOKEEPER_CLIENT_PORT, -1));
+    assertEquals(zkClusterKey.getZnodeParent(), conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT));
+
+    String reconstructedKey = ZKConfig.getZooKeeperClusterKey(conf);
+    if (multiplePortSupport) {
+      String key2 = ensemble2 + ":" + port + ":" + znode;
+      assertEquals(key2, reconstructedKey);
+    }
+    else {
+      assertEquals(key, reconstructedKey);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/5dec5ad2/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/SyncTable.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/SyncTable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/SyncTable.java
index 3495ca9..23fd10e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/SyncTable.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/SyncTable.java
@@ -43,7 +43,6 @@ import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.zookeeper.ZKUtil;
 import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
@@ -174,8 +173,9 @@ public class SyncTable extends Configured implements Tool {
       
       Configuration conf = context.getConfiguration();
       sourceHashDir = new Path(conf.get(SOURCE_HASH_DIR_CONF_KEY));
-      sourceConnection = openConnection(conf, SOURCE_ZK_CLUSTER_CONF_KEY);
-      targetConnection = openConnection(conf, TARGET_ZK_CLUSTER_CONF_KEY);
+      sourceConnection = openConnection(conf, SOURCE_ZK_CLUSTER_CONF_KEY, null);
+      targetConnection = openConnection(conf, TARGET_ZK_CLUSTER_CONF_KEY,
+          TableOutputFormat.OUTPUT_CONF_PREFIX);
       sourceTable = openTable(sourceConnection, conf, SOURCE_TABLE_CONF_KEY);
       targetTable = openTable(targetConnection, conf, TARGET_TABLE_CONF_KEY);
       dryRun = conf.getBoolean(SOURCE_TABLE_CONF_KEY, false);
@@ -196,13 +196,12 @@ public class SyncTable extends Configured implements Tool {
       targetHasher = new HashTable.ResultHasher();
     }
   
-    private static Connection openConnection(Configuration conf, String zkClusterConfKey)
+    private static Connection openConnection(Configuration conf, String zkClusterConfKey,
+                                             String configPrefix)
       throws IOException {
-        Configuration clusterConf = new Configuration(conf);
         String zkCluster = conf.get(zkClusterConfKey);
-        if (zkCluster != null) {
-          ZKUtil.applyClusterKeyToConf(clusterConf, zkCluster);
-        }
+        Configuration clusterConf = HBaseConfiguration.createClusterConf(conf,
+            zkCluster, configPrefix);
         return ConnectionFactory.createConnection(clusterConf);
     }
     

http://git-wip-us.apache.org/repos/asf/hbase/blob/5dec5ad2/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
index fdd68ce..1614883 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java
@@ -26,6 +26,7 @@ import java.util.*;
 import java.util.zip.ZipEntry;
 import java.util.zip.ZipFile;
 
+import com.google.protobuf.InvalidProtocolBufferException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -49,12 +50,11 @@ import org.apache.hadoop.hbase.security.UserProvider;
 import org.apache.hadoop.hbase.security.token.TokenUtil;
 import org.apache.hadoop.hbase.util.Base64;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZKConfig;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.util.StringUtils;
-import com.google.protobuf.InvalidProtocolBufferException;
 
 /**
  * Utility for {@link TableMapper} and {@link TableReducer}
@@ -475,12 +475,8 @@ public class TableMapReduceUtil {
         String quorumAddress = job.getConfiguration().get(TableOutputFormat.QUORUM_ADDRESS);
         User user = userProvider.getCurrent();
         if (quorumAddress != null) {
-          Configuration peerConf = HBaseConfiguration.create(job.getConfiguration());
-          ZKUtil.applyClusterKeyToConf(peerConf, quorumAddress);
-          // apply any "hbase.mapred.output." configuration overrides
-          Configuration outputOverrides =
-              HBaseConfiguration.subset(peerConf, TableOutputFormat.OUTPUT_CONF_PREFIX);
-          HBaseConfiguration.merge(peerConf, outputOverrides);
+          Configuration peerConf = HBaseConfiguration.createClusterConf(job.getConfiguration(),
+              quorumAddress, TableOutputFormat.OUTPUT_CONF_PREFIX);
           Connection peerConn = ConnectionFactory.createConnection(peerConf);
           try {
             TokenUtil.addTokenForJob(peerConn, user, job);
@@ -513,15 +509,30 @@ public class TableMapReduceUtil {
    * @param job The job that requires the permission.
    * @param quorumAddress string that contains the 3 required configuratins
    * @throws IOException When the authentication token cannot be obtained.
+   * @deprecated Since 1.2.0, use {@link #initCredentialsForCluster(Job, Configuration)} instead.
    */
+  @Deprecated
   public static void initCredentialsForCluster(Job job, String quorumAddress)
       throws IOException {
+    Configuration peerConf = HBaseConfiguration.createClusterConf(job.getConfiguration(),
+        quorumAddress);
+    initCredentialsForCluster(job, peerConf);
+  }
+
+  /**
+   * Obtain an authentication token, for the specified cluster, on behalf of the current user
+   * and add it to the credentials for the given map reduce job.
+   *
+   * @param job The job that requires the permission.
+   * @param conf The configuration to use in connecting to the peer cluster
+   * @throws IOException When the authentication token cannot be obtained.
+   */
+  public static void initCredentialsForCluster(Job job, Configuration conf)
+      throws IOException {
     UserProvider userProvider = UserProvider.instantiate(job.getConfiguration());
     if (userProvider.isHBaseSecurityEnabled()) {
       try {
-        Configuration peerConf = HBaseConfiguration.create(job.getConfiguration());
-        ZKUtil.applyClusterKeyToConf(peerConf, quorumAddress);
-        Connection peerConn = ConnectionFactory.createConnection(peerConf);
+        Connection peerConn = ConnectionFactory.createConnection(conf);
         try {
           TokenUtil.addTokenForJob(peerConn, userProvider.getCurrent(), job);
         } finally {
@@ -670,7 +681,7 @@ public class TableMapReduceUtil {
     // If passed a quorum/ensemble address, pass it on to TableOutputFormat.
     if (quorumAddress != null) {
       // Calling this will validate the format
-      ZKUtil.transformClusterKey(quorumAddress);
+      ZKConfig.validateClusterKey(quorumAddress);
       conf.set(TableOutputFormat.QUORUM_ADDRESS,quorumAddress);
     }
     if (serverClass != null && serverImpl != null) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/5dec5ad2/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java
index 190962e..5904f9c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java
@@ -36,7 +36,6 @@ import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.zookeeper.ZKUtil;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.OutputFormat;
@@ -191,22 +190,19 @@ implements Configurable {
 
   @Override
   public void setConf(Configuration otherConf) {
-    this.conf = HBaseConfiguration.create(otherConf);
-
-    String tableName = this.conf.get(OUTPUT_TABLE);
+    String tableName = otherConf.get(OUTPUT_TABLE);
     if(tableName == null || tableName.length() <= 0) {
       throw new IllegalArgumentException("Must specify table name");
     }
 
-    String address = this.conf.get(QUORUM_ADDRESS);
-    int zkClientPort = this.conf.getInt(QUORUM_PORT, 0);
-    String serverClass = this.conf.get(REGION_SERVER_CLASS);
-    String serverImpl = this.conf.get(REGION_SERVER_IMPL);
+    String address = otherConf.get(QUORUM_ADDRESS);
+    int zkClientPort = otherConf.getInt(QUORUM_PORT, 0);
+    String serverClass = otherConf.get(REGION_SERVER_CLASS);
+    String serverImpl = otherConf.get(REGION_SERVER_IMPL);
 
     try {
-      if (address != null) {
-        ZKUtil.applyClusterKeyToConf(this.conf, address);
-      }
+      this.conf = HBaseConfiguration.createClusterConf(otherConf, address, OUTPUT_CONF_PREFIX);
+
       if (serverClass != null) {
         this.conf.set(HConstants.REGION_SERVER_IMPL, serverImpl);
       }
@@ -217,9 +213,5 @@ implements Configurable {
       LOG.error(e);
       throw new RuntimeException(e);
     }
-
-    // finally apply any remaining "hbase.mapred.output." configuration overrides
-    Configuration outputOverrides = HBaseConfiguration.subset(otherConf, OUTPUT_CONF_PREFIX);
-    HBaseConfiguration.merge(this.conf, outputOverrides);
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/5dec5ad2/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
index 9bd2a6c..75dfe9e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
@@ -47,7 +47,6 @@ import org.apache.hadoop.hbase.replication.ReplicationPeers;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
-import org.apache.hadoop.hbase.zookeeper.ZKUtil;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
 import org.apache.hadoop.util.Tool;
@@ -69,6 +68,7 @@ public class VerifyReplication extends Configured implements Tool {
       LogFactory.getLog(VerifyReplication.class);
 
   public final static String NAME = "verifyrep";
+  private final static String PEER_CONFIG_PREFIX = NAME + ".peer.";
   static long startTime = 0;
   static long endTime = Long.MAX_VALUE;
   static int versions = -1;
@@ -126,8 +126,8 @@ public class VerifyReplication extends Configured implements Tool {
           @Override
           public Void connect(HConnection conn) throws IOException {
             String zkClusterKey = conf.get(NAME + ".peerQuorumAddress");
-            Configuration peerConf = HBaseConfiguration.create(conf);
-            ZKUtil.applyClusterKeyToConf(peerConf, zkClusterKey);
+            Configuration peerConf = HBaseConfiguration.createClusterConf(conf,
+                zkClusterKey, PEER_CONFIG_PREFIX);
 
             TableName tableName = TableName.valueOf(conf.get(NAME + ".tableName"));
             replicatedTable = new HTable(peerConf, tableName);
@@ -203,7 +203,8 @@ public class VerifyReplication extends Configured implements Tool {
     }
   }
 
-  private static String getPeerQuorumAddress(final Configuration conf) throws IOException {
+  private static Pair<ReplicationPeerConfig, Configuration> getPeerQuorumConfig(
+      final Configuration conf) throws IOException {
     ZooKeeperWatcher localZKW = null;
     ReplicationPeerZKImpl peer = null;
     try {
@@ -220,8 +221,8 @@ public class VerifyReplication extends Configured implements Tool {
       if (pair == null) {
         throw new IOException("Couldn't get peer conf!");
       }
-      Configuration peerConf = rp.getPeerConf(peerId).getSecond();
-      return ZKUtil.getZooKeeperClusterKey(peerConf);
+
+      return pair;
     } catch (ReplicationException e) {
       throw new IOException(
           "An error occured while trying to connect to the remove peer cluster", e);
@@ -260,9 +261,14 @@ public class VerifyReplication extends Configured implements Tool {
       conf.set(NAME+".families", families);
     }
 
-    String peerQuorumAddress = getPeerQuorumAddress(conf);
+    Pair<ReplicationPeerConfig, Configuration> peerConfigPair = getPeerQuorumConfig(conf);
+    ReplicationPeerConfig peerConfig = peerConfigPair.getFirst();
+    String peerQuorumAddress = peerConfig.getClusterKey();
+    LOG.info("Peer Quorum Address: " + peerQuorumAddress + ", Peer Configuration: " +
+        peerConfig.getConfiguration());
     conf.set(NAME + ".peerQuorumAddress", peerQuorumAddress);
-    LOG.info("Peer Quorum Address: " + peerQuorumAddress);
+    HBaseConfiguration.setWithPrefix(conf, PEER_CONFIG_PREFIX,
+        peerConfig.getConfiguration().entrySet());
 
     conf.setInt(NAME + ".versions", versions);
     LOG.info("Number of version: " + versions);
@@ -285,8 +291,9 @@ public class VerifyReplication extends Configured implements Tool {
     TableMapReduceUtil.initTableMapperJob(tableName, scan,
         Verifier.class, null, null, job);
 
+    Configuration peerClusterConf = peerConfigPair.getSecond();
     // Obtain the auth token from peer cluster
-    TableMapReduceUtil.initCredentialsForCluster(job, peerQuorumAddress);
+    TableMapReduceUtil.initCredentialsForCluster(job, peerClusterConf);
 
     job.setOutputFormatClass(NullOutputFormat.class);
     job.setNumReduceTasks(0);

http://git-wip-us.apache.org/repos/asf/hbase/blob/5dec5ad2/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java
index 5c61afb..2ba1b47 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java
@@ -33,7 +33,7 @@ import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
 import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
 import org.apache.hadoop.hbase.replication.regionserver.RegionReplicaReplicationEndpoint;
-import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZKConfig;
 
 /**
  * Similar to {@link RegionReplicaUtil} but for the server side
@@ -148,7 +148,7 @@ public class ServerRegionReplicaUtil extends RegionReplicaUtil {
     try {
       if (repAdmin.getPeerConfig(REGION_REPLICA_REPLICATION_PEER) == null) {
         ReplicationPeerConfig peerConfig = new ReplicationPeerConfig();
-        peerConfig.setClusterKey(ZKUtil.getZooKeeperClusterKey(conf));
+        peerConfig.setClusterKey(ZKConfig.getZooKeeperClusterKey(conf));
         peerConfig.setReplicationEndpointImpl(RegionReplicaReplicationEndpoint.class.getName());
         repAdmin.addPeer(REGION_REPLICA_REPLICATION_PEER, peerConfig, null);
       }

http://git-wip-us.apache.org/repos/asf/hbase/blob/5dec5ad2/hbase-server/src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java
index a756652..3441aa6 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java
@@ -345,71 +345,6 @@ public class TestZooKeeper {
     assertNull(ZKUtil.getDataNoWatch(zkw, "/l1/l2", null));
   }
 
-  @Test
-  public void testClusterKey() throws Exception {
-    testKey("server", 2181, "hbase");
-    testKey("server1,server2,server3", 2181, "hbase");
-    try {
-      ZKUtil.transformClusterKey("2181:hbase");
-    } catch (IOException ex) {
-      // OK
-    }
-  }
-
-  @Test
-  public void testClusterKeyWithMultiplePorts() throws Exception {
-    // server has different port than the default port
-    testKey("server1:2182", 2181, "hbase", true);
-    // multiple servers have their own port
-    testKey("server1:2182,server2:2183,server3:2184", 2181, "hbase", true);
-    // one server has no specified port, should use default port
-    testKey("server1:2182,server2,server3:2184", 2181, "hbase", true);
-    // the last server has no specified port, should use default port
-    testKey("server1:2182,server2:2183,server3", 2181, "hbase", true);
-    // multiple servers have no specified port, should use default port for those servers
-    testKey("server1:2182,server2,server3:2184,server4", 2181, "hbase", true);
-    // same server, different ports
-    testKey("server1:2182,server1:2183,server1", 2181, "hbase", true);
-    // mix of same server/different port and different server
-    testKey("server1:2182,server2:2183,server1", 2181, "hbase", true);
-  }
-
-  private void testKey(String ensemble, int port, String znode)
-      throws IOException {
-    testKey(ensemble, port, znode, false); // not support multiple client ports
-  }
-
-  private void testKey(String ensemble, int port, String znode, Boolean multiplePortSupport)
-      throws IOException {
-    Configuration conf = new Configuration();
-    String key = ensemble+":"+port+":"+znode;
-    String ensemble2 = null;
-    ZKUtil.ZKClusterKey zkClusterKey = ZKUtil.transformClusterKey(key);
-    if (multiplePortSupport) {
-      ensemble2 = ZKUtil.standardizeQuorumServerString(ensemble, Integer.toString(port));
-      assertEquals(ensemble2, zkClusterKey.quorumString);
-    }
-    else {
-      assertEquals(ensemble, zkClusterKey.quorumString);
-    }
-    assertEquals(port, zkClusterKey.clientPort);
-    assertEquals(znode, zkClusterKey.znodeParent);
-
-    ZKUtil.applyClusterKeyToConf(conf, key);
-    assertEquals(zkClusterKey.quorumString, conf.get(HConstants.ZOOKEEPER_QUORUM));
-    assertEquals(zkClusterKey.clientPort, conf.getInt(HConstants.ZOOKEEPER_CLIENT_PORT, -1));
-    assertEquals(zkClusterKey.znodeParent, conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT));
-
-    String reconstructedKey = ZKUtil.getZooKeeperClusterKey(conf);
-    if (multiplePortSupport) {
-      String key2 = ensemble2 + ":" + port + ":" + znode;
-      assertEquals(key2, reconstructedKey);
-    }
-    else {
-      assertEquals(key, reconstructedKey);
-    }
-  }
-
   /**
    * A test for HBASE-3238
    * @throws IOException A connection attempt to zk failed

http://git-wip-us.apache.org/repos/asf/hbase/blob/5dec5ad2/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
index d5e0e31..119cee5 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
@@ -24,9 +24,13 @@ import java.util.TreeMap;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.*;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.replication.ReplicationFactory;
+import org.apache.hadoop.hbase.replication.ReplicationPeer;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
 import org.apache.hadoop.hbase.replication.ReplicationQueues;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
@@ -37,10 +41,12 @@ import org.junit.experimental.categories.Category;
 
 import com.google.common.collect.Lists;
 
-import static org.junit.Assert.fail;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
 
 /**
  * Unit testing of ReplicationAdmin
@@ -135,7 +141,7 @@ public class TestReplicationAdmin {
     }
     repQueues.removeQueue(ID_ONE);
     assertEquals(0, repQueues.getAllQueues().size());
-    
+
     // add recovered queue for ID_ONE
     repQueues.addLog(ID_ONE + "-server2", "file1");
     try {
@@ -149,6 +155,28 @@ public class TestReplicationAdmin {
   }
 
   /**
+   * Tests that the peer configuration used by ReplicationAdmin contains all
+   * the peer's properties.
+   */
+  @Test
+  public void testPeerConfig() throws Exception {
+    ReplicationPeerConfig config = new ReplicationPeerConfig();
+    config.setClusterKey(KEY_ONE);
+    config.getConfiguration().put("key1", "value1");
+    config.getConfiguration().put("key2", "value2");
+    admin.addPeer(ID_ONE, config, null);
+
+    List<ReplicationPeer> peers = admin.listValidReplicationPeers();
+    assertEquals(1, peers.size());
+    ReplicationPeer peerOne = peers.get(0);
+    assertNotNull(peerOne);
+    assertEquals("value1", peerOne.getConfiguration().get("key1"));
+    assertEquals("value2", peerOne.getConfiguration().get("key2"));
+
+    admin.removePeer(ID_ONE);
+  }
+
+  /**
    * basic checks that when we add a peer that it is enabled, and that we can disable
    * @throws Exception
    */