You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2016/10/28 05:28:32 UTC

hbase git commit: HBASE-16835 Revisit the zookeeper usage at client side

Repository: hbase
Updated Branches:
  refs/heads/master e108a4f81 -> 3283bc7c9


HBASE-16835 Revisit the zookeeper usage at client side


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/3283bc7c
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/3283bc7c
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/3283bc7c

Branch: refs/heads/master
Commit: 3283bc7c91b61b9687ba6f98e1a28ef6ab81e432
Parents: e108a4f
Author: zhangduo <zh...@apache.org>
Authored: Fri Oct 28 09:50:20 2016 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Fri Oct 28 13:27:41 2016 +0800

----------------------------------------------------------------------
 hbase-client/pom.xml                            |  12 +
 .../hbase/client/AsyncConnectionImpl.java       |   2 +-
 .../hadoop/hbase/client/AsyncRegistry.java      |  71 ++++++
 .../hadoop/hbase/client/ClusterRegistry.java    |  41 ---
 .../hbase/client/ClusterRegistryFactory.java    |   6 +-
 .../hadoop/hbase/client/ZKAsyncRegistry.java    | 252 +++++++++++++++++++
 .../hadoop/hbase/client/ZKClusterRegistry.java  |  78 ------
 .../hbase/zookeeper/RecoverableZooKeeper.java   |   6 +-
 .../hbase/client/TestAsyncTableNoncedRetry.java |   3 +-
 .../hbase/client/TestZKAsyncRegistry.java       |  78 ++++++
 pom.xml                                         |  80 +++---
 11 files changed, 473 insertions(+), 156 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/3283bc7c/hbase-client/pom.xml
----------------------------------------------------------------------
diff --git a/hbase-client/pom.xml b/hbase-client/pom.xml
index f3e27bc..f61c6d0 100644
--- a/hbase-client/pom.xml
+++ b/hbase-client/pom.xml
@@ -203,6 +203,18 @@
       <artifactId>mockito-all</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.curator</groupId>
+      <artifactId>curator-recipes</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.curator</groupId>
+      <artifactId>curator-framework</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.curator</groupId>
+      <artifactId>curator-client</artifactId>
+    </dependency>
   </dependencies>
 
   <profiles>

http://git-wip-us.apache.org/repos/asf/hbase/blob/3283bc7c/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
index 7a8fd9a..121a16b 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
@@ -66,7 +66,7 @@ class AsyncConnectionImpl implements AsyncConnection {
 
   private final User user;
 
-  private final ClusterRegistry registry;
+  private final AsyncRegistry registry;
 
   private final String clusterId;
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/3283bc7c/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegistry.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegistry.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegistry.java
new file mode 100644
index 0000000..731cf09
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegistry.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.io.Closeable;
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.hadoop.hbase.RegionLocations;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * Implementations hold cluster information such as this cluster's id, location of hbase:meta, etc..
+ * All stuffs that may be related to zookeeper at client side are placed here.
+ * <p>
+ * Most methods are executed asynchronously except getClusterId. It will be executed synchronously
+ * and should be called only once when initialization.
+ * <p>
+ * Internal use only.
+ */
+@InterfaceAudience.Private
+interface AsyncRegistry extends Closeable {
+
+  /**
+   * Get the location of meta region.
+   */
+  CompletableFuture<RegionLocations> getMetaRegionLocation();
+
+  /**
+   * Should only be called once.
+   * <p>
+   * The upper layer should store this value somewhere as it will not be change any more.
+   */
+  String getClusterId();
+
+  /**
+   * Get the number of 'running' regionservers.
+   */
+  CompletableFuture<Integer> getCurrentNrHRS();
+
+  /**
+   * Get the address of HMaster.
+   */
+  CompletableFuture<ServerName> getMasterAddress();
+
+  /**
+   * Get the info port of HMaster.
+   */
+  CompletableFuture<Integer> getMasterInfoPort();
+
+  /**
+   * Closes this instance and releases any system resources associated with it
+   */
+  @Override
+  void close();
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/3283bc7c/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterRegistry.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterRegistry.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterRegistry.java
deleted file mode 100644
index c1918a7..0000000
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterRegistry.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.client;
-
-import java.io.Closeable;
-
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-
-/**
- * Implementations hold cluster information such as this cluster's id.
- * <p>
- * Internal use only.
- */
-@InterfaceAudience.Private
-interface ClusterRegistry extends Closeable {
-
-  /**
-   * Should only be called once.
-   * <p>
-   * The upper layer should store this value somewhere as it will not be change any more.
-   */
-  String getClusterId();
-
-  @Override
-  void close();
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/3283bc7c/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterRegistryFactory.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterRegistryFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterRegistryFactory.java
index a6b3e39..48bfb18 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterRegistryFactory.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterRegistryFactory.java
@@ -35,9 +35,9 @@ final class ClusterRegistryFactory {
   /**
    * @return The cluster registry implementation to use.
    */
-  static ClusterRegistry getRegistry(Configuration conf) {
-    Class<? extends ClusterRegistry> clazz =
-        conf.getClass(REGISTRY_IMPL_CONF_KEY, ZKClusterRegistry.class, ClusterRegistry.class);
+  static AsyncRegistry getRegistry(Configuration conf) {
+    Class<? extends AsyncRegistry> clazz =
+        conf.getClass(REGISTRY_IMPL_CONF_KEY, ZKAsyncRegistry.class, AsyncRegistry.class);
     return ReflectionUtils.newInstance(clazz, conf);
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/3283bc7c/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKAsyncRegistry.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKAsyncRegistry.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKAsyncRegistry.java
new file mode 100644
index 0000000..c76aa3e
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKAsyncRegistry.java
@@ -0,0 +1,252 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.apache.hadoop.hbase.HConstants.DEFAULT_ZK_SESSION_TIMEOUT;
+import static org.apache.hadoop.hbase.HConstants.ZK_SESSION_TIMEOUT;
+import static org.apache.hadoop.hbase.HRegionInfo.DEFAULT_REPLICA_ID;
+import static org.apache.hadoop.hbase.HRegionInfo.FIRST_META_REGIONINFO;
+import static org.apache.hadoop.hbase.client.RegionReplicaUtil.getRegionInfoForDefaultReplica;
+import static org.apache.hadoop.hbase.client.RegionReplicaUtil.getRegionInfoForReplica;
+import static org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil.lengthOfPBMagic;
+import static org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper.removeMetaData;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.commons.lang.mutable.MutableInt;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.api.BackgroundPathable;
+import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.curator.retry.RetryNTimes;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ClusterId;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.RegionLocations;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.master.RegionState;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.zookeeper.ZKConfig;
+import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
+import org.apache.zookeeper.data.Stat;
+
+/**
+ * Cache the cluster registry data in memory and use zk watcher to update. The only exception is
+ * {@link #getClusterId()}, it will fetch the data from zk directly.
+ */
+@InterfaceAudience.Private
+class ZKAsyncRegistry implements AsyncRegistry {
+
+  private static final Log LOG = LogFactory.getLog(ZKAsyncRegistry.class);
+
+  private final CuratorFramework zk;
+
+  private final ZNodePaths znodePaths;
+
+  ZKAsyncRegistry(Configuration conf) {
+    this.znodePaths = new ZNodePaths(conf);
+    int zkSessionTimeout = conf.getInt(ZK_SESSION_TIMEOUT, DEFAULT_ZK_SESSION_TIMEOUT);
+    int zkRetry = conf.getInt("zookeeper.recovery.retry", 3);
+    int zkRetryIntervalMs = conf.getInt("zookeeper.recovery.retry.intervalmill", 1000);
+    this.zk = CuratorFrameworkFactory.builder()
+        .connectString(ZKConfig.getZKQuorumServersString(conf)).sessionTimeoutMs(zkSessionTimeout)
+        .retryPolicy(new RetryNTimes(zkRetry, zkRetryIntervalMs))
+        .threadFactory(
+          Threads.newDaemonThreadFactory(String.format("ZKClusterRegistry-0x%08x", hashCode())))
+        .build();
+    this.zk.start();
+  }
+
+  @Override
+  public String getClusterId() {
+    try {
+      byte[] data = zk.getData().forPath(znodePaths.clusterIdZNode);
+      if (data == null || data.length == 0) {
+        return null;
+      }
+      data = removeMetaData(data);
+      return ClusterId.parseFrom(data).toString();
+    } catch (Exception e) {
+      LOG.warn("failed to get cluster id", e);
+      return null;
+    }
+  }
+
+  @Override
+  public void close() {
+    zk.close();
+  }
+
+  private interface CuratorEventProcessor<T> {
+    T process(CuratorEvent event) throws Exception;
+  }
+
+  private static <T> CompletableFuture<T> exec(BackgroundPathable<?> opBuilder, String path,
+      CuratorEventProcessor<T> processor) {
+    CompletableFuture<T> future = new CompletableFuture<>();
+    try {
+      opBuilder.inBackground((client, event) -> {
+        try {
+          future.complete(processor.process(event));
+        } catch (Exception e) {
+          future.completeExceptionally(e);
+        }
+      }).withUnhandledErrorListener((msg, e) -> future.completeExceptionally(e)).forPath(path);
+    } catch (Exception e) {
+      future.completeExceptionally(e);
+    }
+    return future;
+  }
+
+  private static ZooKeeperProtos.MetaRegionServer getMetaProto(CuratorEvent event)
+      throws IOException {
+    byte[] data = event.getData();
+    if (data == null || data.length == 0) {
+      return null;
+    }
+    data = removeMetaData(data);
+    int prefixLen = lengthOfPBMagic();
+    return ZooKeeperProtos.MetaRegionServer.parser().parseFrom(data, prefixLen,
+      data.length - prefixLen);
+  }
+
+  private static void tryComplete(MutableInt remaining, HRegionLocation[] locs,
+      CompletableFuture<RegionLocations> future) {
+    remaining.decrement();
+    if (remaining.intValue() > 0) {
+      return;
+    }
+    future.complete(new RegionLocations(locs));
+  }
+
+  private Pair<RegionState.State, ServerName> getStateAndServerName(
+      ZooKeeperProtos.MetaRegionServer proto) {
+    RegionState.State state;
+    if (proto.hasState()) {
+      state = RegionState.State.convert(proto.getState());
+    } else {
+      state = RegionState.State.OPEN;
+    }
+    HBaseProtos.ServerName snProto = proto.getServer();
+    return Pair.newPair(state,
+      ServerName.valueOf(snProto.getHostName(), snProto.getPort(), snProto.getStartCode()));
+  }
+
+  @Override
+  public CompletableFuture<RegionLocations> getMetaRegionLocation() {
+    CompletableFuture<RegionLocations> future = new CompletableFuture<>();
+    HRegionLocation[] locs = new HRegionLocation[znodePaths.metaReplicaZNodes.size()];
+    MutableInt remaining = new MutableInt(locs.length);
+    znodePaths.metaReplicaZNodes.forEach((replicaId, path) -> {
+      if (replicaId == DEFAULT_REPLICA_ID) {
+        exec(zk.getData(), path, ZKAsyncRegistry::getMetaProto).whenComplete((proto, error) -> {
+          if (error != null) {
+            future.completeExceptionally(error);
+            return;
+          }
+          if (proto == null) {
+            future.completeExceptionally(new IOException("Meta znode is null"));
+            return;
+          }
+          Pair<RegionState.State, ServerName> stateAndServerName = getStateAndServerName(proto);
+          if (stateAndServerName.getFirst() != RegionState.State.OPEN) {
+            future.completeExceptionally(
+              new IOException("Meta region is in state " + stateAndServerName.getFirst()));
+            return;
+          }
+          locs[DEFAULT_REPLICA_ID] = new HRegionLocation(
+              getRegionInfoForDefaultReplica(FIRST_META_REGIONINFO),
+              stateAndServerName.getSecond());
+          tryComplete(remaining, locs, future);
+        });
+      } else {
+        exec(zk.getData(), path, ZKAsyncRegistry::getMetaProto).whenComplete((proto, error) -> {
+          if (future.isDone()) {
+            return;
+          }
+          if (error != null) {
+            LOG.warn("Failed to fetch " + path, error);
+            locs[replicaId] = null;
+          } else if (proto == null) {
+            LOG.warn("Meta znode for replica " + replicaId + " is null");
+            locs[replicaId] = null;
+          } else {
+            Pair<RegionState.State, ServerName> stateAndServerName = getStateAndServerName(proto);
+            if (stateAndServerName.getFirst() != RegionState.State.OPEN) {
+              LOG.warn("Meta region for replica " + replicaId + " is in state "
+                  + stateAndServerName.getFirst());
+              locs[replicaId] = null;
+            } else {
+              locs[replicaId] = new HRegionLocation(
+                  getRegionInfoForReplica(FIRST_META_REGIONINFO, replicaId),
+                  stateAndServerName.getSecond());
+            }
+          }
+          tryComplete(remaining, locs, future);
+        });
+      }
+    });
+    return future;
+  }
+
+  private static int getCurrentNrHRS(CuratorEvent event) {
+    Stat stat = event.getStat();
+    return stat != null ? stat.getNumChildren() : 0;
+  }
+
+  @Override
+  public CompletableFuture<Integer> getCurrentNrHRS() {
+    return exec(zk.checkExists(), znodePaths.rsZNode, ZKAsyncRegistry::getCurrentNrHRS);
+  }
+
+  private static ZooKeeperProtos.Master getMasterProto(CuratorEvent event) throws IOException {
+    byte[] data = event.getData();
+    if (data == null || data.length == 0) {
+      return null;
+    }
+    data = removeMetaData(data);
+    int prefixLen = lengthOfPBMagic();
+    return ZooKeeperProtos.Master.parser().parseFrom(data, prefixLen, data.length - prefixLen);
+  }
+
+  @Override
+  public CompletableFuture<ServerName> getMasterAddress() {
+    return exec(zk.getData(), znodePaths.masterAddressZNode, ZKAsyncRegistry::getMasterProto)
+        .thenApply(proto -> {
+          if (proto == null) {
+            return null;
+          }
+          HBaseProtos.ServerName snProto = proto.getMaster();
+          return ServerName.valueOf(snProto.getHostName(), snProto.getPort(),
+            snProto.getStartCode());
+        });
+  }
+
+  @Override
+  public CompletableFuture<Integer> getMasterInfoPort() {
+    return exec(zk.getData(), znodePaths.masterAddressZNode, ZKAsyncRegistry::getMasterProto)
+        .thenApply(proto -> proto != null ? proto.getInfoPort() : 0);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/3283bc7c/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKClusterRegistry.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKClusterRegistry.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKClusterRegistry.java
deleted file mode 100644
index d385136..0000000
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKClusterRegistry.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.client;
-
-import static org.apache.hadoop.hbase.HConstants.DEFAULT_ZK_SESSION_TIMEOUT;
-import static org.apache.hadoop.hbase.HConstants.ZK_SESSION_TIMEOUT;
-
-import java.io.IOException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.ClusterId;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper;
-import org.apache.hadoop.hbase.zookeeper.ZKConfig;
-import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
-
-/**
- * Cache the cluster registry data in memory and use zk watcher to update. The only exception is
- * {@link #getClusterId()}, it will fetch the data from zk directly.
- */
-@InterfaceAudience.Private
-class ZKClusterRegistry implements ClusterRegistry {
-
-  private static final Log LOG = LogFactory.getLog(ZKClusterRegistry.class);
-
-  private final RecoverableZooKeeper zk;
-
-  private final ZNodePaths znodePaths;
-
-  ZKClusterRegistry(Configuration conf) throws IOException {
-    this.znodePaths = new ZNodePaths(conf);
-    int zkSessionTimeout = conf.getInt(ZK_SESSION_TIMEOUT, DEFAULT_ZK_SESSION_TIMEOUT);
-    int zkRetry = conf.getInt("zookeeper.recovery.retry", 3);
-    int zkRetryIntervalMs = conf.getInt("zookeeper.recovery.retry.intervalmill", 1000);
-    this.zk = new RecoverableZooKeeper(ZKConfig.getZKQuorumServersString(conf), zkSessionTimeout,
-        null, zkRetry, zkRetryIntervalMs);
-  }
-
-  @Override
-  public String getClusterId() {
-    try {
-      byte[] data = zk.getData(znodePaths.clusterIdZNode, false, null);
-      if (data == null || data.length == 0) {
-        return null;
-      }
-      return ClusterId.parseFrom(data).toString();
-    } catch (Exception e) {
-      LOG.warn("failed to get cluster id", e);
-      return null;
-    }
-  }
-
-  @Override
-  public void close() {
-    try {
-      zk.close();
-    } catch (InterruptedException e) {
-      LOG.warn("close zookeeper failed", e);
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/3283bc7c/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java
index 371279e..14532cf 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java
@@ -352,7 +352,7 @@ public class RecoverableZooKeeper {
       while (true) {
         try {
           byte[] revData = checkZk().getData(path, watcher, stat);
-          return this.removeMetaData(revData);
+          return removeMetaData(revData);
         } catch (KeeperException e) {
           switch (e.code()) {
             case CONNECTIONLOSS:
@@ -384,7 +384,7 @@ public class RecoverableZooKeeper {
       while (true) {
         try {
           byte[] revData = checkZk().getData(path, watch, stat);
-          return this.removeMetaData(revData);
+          return removeMetaData(revData);
         } catch (KeeperException e) {
           switch (e.code()) {
             case CONNECTIONLOSS:
@@ -707,7 +707,7 @@ public class RecoverableZooKeeper {
     return null;
   }
 
-  public byte[] removeMetaData(byte[] data) {
+  public static byte[] removeMetaData(byte[] data) {
     if(data == null || data.length == 0) {
       return data;
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/3283bc7c/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableNoncedRetry.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableNoncedRetry.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableNoncedRetry.java
index 8fc0f60..840f844 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableNoncedRetry.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableNoncedRetry.java
@@ -17,7 +17,8 @@
  */
 package org.apache.hadoop.hbase.client;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
 
 import java.io.IOException;
 import java.util.concurrent.ExecutionException;

http://git-wip-us.apache.org/repos/asf/hbase/blob/3283bc7c/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestZKAsyncRegistry.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestZKAsyncRegistry.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestZKAsyncRegistry.java
new file mode 100644
index 0000000..a94df61
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestZKAsyncRegistry.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.apache.hadoop.hbase.HConstants.META_REPLICAS_NUM;
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.IntStream;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.RegionLocations;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ MediumTests.class, ClientTests.class })
+public class TestZKAsyncRegistry {
+
+  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+  private static ZKAsyncRegistry REGISTRY;
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    TEST_UTIL.getConfiguration().setInt(META_REPLICAS_NUM, 3);
+    TEST_UTIL.startMiniCluster(3);
+    TEST_UTIL.waitUntilAllRegionsAssigned(TableName.META_TABLE_NAME);
+    REGISTRY = new ZKAsyncRegistry(TEST_UTIL.getConfiguration());
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    IOUtils.closeQuietly(REGISTRY);
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  @Test
+  public void test() throws InterruptedException, ExecutionException, IOException {
+    assertEquals(TEST_UTIL.getHBaseCluster().getClusterStatus().getClusterId(),
+      REGISTRY.getClusterId());
+    assertEquals(TEST_UTIL.getHBaseCluster().getClusterStatus().getServersSize(),
+      REGISTRY.getCurrentNrHRS().get().intValue());
+    assertEquals(TEST_UTIL.getHBaseCluster().getMaster().getServerName(),
+      REGISTRY.getMasterAddress().get());
+    assertEquals(-1, REGISTRY.getMasterInfoPort().get().intValue());
+    RegionLocations locs = REGISTRY.getMetaRegionLocation().get();
+    assertEquals(3, locs.getRegionLocations().length);
+    IntStream.range(0, 3).forEach(i -> {
+      HRegionLocation loc = locs.getRegionLocation(i);
+      assertNotNull(loc);
+      assertTrue(loc.getRegionInfo().getTable().equals(TableName.META_TABLE_NAME));
+      assertEquals(i, loc.getRegionInfo().getReplicaId());
+    });
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/3283bc7c/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index ad77f05..65b8adb 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1230,6 +1230,7 @@
     <bouncycastle.version>1.46</bouncycastle.version>
     <kerby.version>1.0.0-RC2</kerby.version>
     <commons-crypto.version>1.0.0</commons-crypto.version>
+    <curator.version>2.11.0</curator.version>
     <!-- Plugin Dependencies -->
     <maven.assembly.version>2.4</maven.assembly.version>
     <maven.antrun.version>1.8</maven.antrun.version>
@@ -1786,39 +1787,60 @@
         <artifactId>disruptor</artifactId>
         <version>${disruptor.version}</version>
       </dependency>
-        <dependency>
+      <dependency>
         <groupId>net.spy</groupId>
         <artifactId>spymemcached</artifactId>
         <version>${spy.version}</version>
         <optional>true</optional>
-    </dependency>
-     <dependency>
-       <groupId>org.bouncycastle</groupId>
-       <artifactId>bcprov-jdk16</artifactId>
-       <version>${bouncycastle.version}</version>
-       <scope>test</scope>
-     </dependency>
-     <dependency>
-       <groupId>org.apache.kerby</groupId>
-       <artifactId>kerb-client</artifactId>
-       <version>${kerby.version}</version>
-     </dependency>
-     <dependency>
-       <groupId>org.apache.kerby</groupId>
-       <artifactId>kerb-simplekdc</artifactId>
-       <version>${kerby.version}</version>
-     </dependency>
-     <dependency>
-       <groupId>org.apache.commons</groupId>
-       <artifactId>commons-crypto</artifactId>
-       <version>${commons-crypto.version}</version>
-       <exclusions>
-         <exclusion>
-           <groupId>net.java.dev.jna</groupId>
-           <artifactId>jna</artifactId>
-         </exclusion>
-       </exclusions>
-     </dependency>
+      </dependency>
+      <dependency>
+        <groupId>org.bouncycastle</groupId>
+        <artifactId>bcprov-jdk16</artifactId>
+        <version>${bouncycastle.version}</version>
+        <scope>test</scope>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.kerby</groupId>
+        <artifactId>kerb-client</artifactId>
+        <version>${kerby.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.kerby</groupId>
+        <artifactId>kerb-simplekdc</artifactId>
+        <version>${kerby.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.commons</groupId>
+        <artifactId>commons-crypto</artifactId>
+        <version>${commons-crypto.version}</version>
+        <exclusions>
+          <exclusion>
+            <groupId>net.java.dev.jna</groupId>
+            <artifactId>jna</artifactId>
+          </exclusion>
+        </exclusions>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.curator</groupId>
+        <artifactId>curator-recipes</artifactId>
+        <version>${curator.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.curator</groupId>
+        <artifactId>curator-framework</artifactId>
+        <version>${curator.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.curator</groupId>
+        <artifactId>curator-client</artifactId>
+        <version>${curator.version}</version>
+        <exclusions>
+          <exclusion>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+          </exclusion>
+        </exclusions>
+      </dependency>
     </dependencies>
   </dependencyManagement>
   <!-- Dependencies needed by subprojects -->