You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2017/12/03 00:31:24 UTC

hbase git commit: HBASE-19399 Purge curator dependency from hbase-client

Repository: hbase
Updated Branches:
  refs/heads/master 8354a563f -> 7a5b07830


HBASE-19399 Purge curator dependency from hbase-client


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/7a5b0783
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/7a5b0783
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/7a5b0783

Branch: refs/heads/master
Commit: 7a5b0783068415eaca70ea2ca938ecbfe3bed30f
Parents: 8354a56
Author: zhangduo <zh...@apache.org>
Authored: Sun Dec 3 08:30:30 2017 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Sun Dec 3 08:30:30 2017 +0800

----------------------------------------------------------------------
 .../hbase/testclassification/ZKTests.java       |  21 ++
 hbase-client/pom.xml                            |   8 -
 .../hadoop/hbase/client/ZKAsyncRegistry.java    | 111 ++----
 .../hbase/zookeeper/ReadOnlyZKClient.java       | 347 +++++++++++++++++++
 .../hbase/client/TestZKAsyncRegistry.java       |  77 ++--
 .../hbase/zookeeper/MiniZooKeeperCluster.java   |   4 +
 .../hadoop/hbase/zookeeper/ZKMainServer.java    |   6 +-
 .../hbase/zookeeper/TestInstancePending.java    |   6 +-
 .../hbase/zookeeper/TestReadOnlyZKClient.java   | 141 ++++++++
 .../hadoop/hbase/zookeeper/TestZKMetrics.java   |   6 +-
 .../hadoop/hbase/zookeeper/TestZKUtil.java      |   8 +-
 .../hadoop/hbase/zookeeper/TestZKWatcher.java   |   3 +-
 .../src/test/resources/log4j.properties         |  68 ++++
 pom.xml                                         |  15 +
 14 files changed, 687 insertions(+), 134 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/7a5b0783/hbase-annotations/src/test/java/org/apache/hadoop/hbase/testclassification/ZKTests.java
----------------------------------------------------------------------
diff --git a/hbase-annotations/src/test/java/org/apache/hadoop/hbase/testclassification/ZKTests.java b/hbase-annotations/src/test/java/org/apache/hadoop/hbase/testclassification/ZKTests.java
new file mode 100644
index 0000000..ad869fa
--- /dev/null
+++ b/hbase-annotations/src/test/java/org/apache/hadoop/hbase/testclassification/ZKTests.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.testclassification;
+
+public interface ZKTests {
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/7a5b0783/hbase-client/pom.xml
----------------------------------------------------------------------
diff --git a/hbase-client/pom.xml b/hbase-client/pom.xml
index cc112d4..45393b5 100644
--- a/hbase-client/pom.xml
+++ b/hbase-client/pom.xml
@@ -196,14 +196,6 @@
       <scope>test</scope>
     </dependency>
     <dependency>
-      <groupId>org.apache.curator</groupId>
-      <artifactId>curator-framework</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.curator</groupId>
-      <artifactId>curator-client</artifactId>
-    </dependency>
-    <dependency>
       <groupId>org.apache.commons</groupId>
       <artifactId>commons-crypto</artifactId>
       <version>${commons-crypto.version}</version>

http://git-wip-us.apache.org/repos/asf/hbase/blob/7a5b0783/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKAsyncRegistry.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKAsyncRegistry.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKAsyncRegistry.java
index 6d49b7f..bd8325e 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKAsyncRegistry.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKAsyncRegistry.java
@@ -17,8 +17,6 @@
  */
 package org.apache.hadoop.hbase.client;
 
-import static org.apache.hadoop.hbase.HConstants.DEFAULT_ZK_SESSION_TIMEOUT;
-import static org.apache.hadoop.hbase.HConstants.ZK_SESSION_TIMEOUT;
 import static org.apache.hadoop.hbase.client.RegionInfo.DEFAULT_REPLICA_ID;
 import static org.apache.hadoop.hbase.client.RegionInfoBuilder.FIRST_META_REGIONINFO;
 import static org.apache.hadoop.hbase.client.RegionReplicaUtil.getRegionInfoForDefaultReplica;
@@ -28,16 +26,10 @@ import static org.apache.hadoop.hbase.zookeeper.ZKMetadata.removeMetaData;
 
 import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.lang3.mutable.MutableInt;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.framework.api.BackgroundPathable;
-import org.apache.curator.framework.api.CuratorEvent;
-import org.apache.curator.retry.RetryNTimes;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.ClusterId;
 import org.apache.hadoop.hbase.HRegionLocation;
@@ -45,16 +37,14 @@ import org.apache.hadoop.hbase.RegionLocations;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.exceptions.DeserializationException;
 import org.apache.hadoop.hbase.master.RegionState;
-import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos;
 import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.hbase.util.Threads;
-import org.apache.hadoop.hbase.zookeeper.ZKConfig;
+import org.apache.hadoop.hbase.zookeeper.ReadOnlyZKClient;
 import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
 import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.zookeeper.data.Stat;
 
+import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos;
 
 /**
  * Fetch the registry data from zookeeper.
@@ -64,53 +54,36 @@ class ZKAsyncRegistry implements AsyncRegistry {
 
   private static final Log LOG = LogFactory.getLog(ZKAsyncRegistry.class);
 
-  private final CuratorFramework zk;
+  private final ReadOnlyZKClient zk;
 
   private final ZNodePaths znodePaths;
 
   ZKAsyncRegistry(Configuration conf) {
     this.znodePaths = new ZNodePaths(conf);
-    int zkSessionTimeout = conf.getInt(ZK_SESSION_TIMEOUT, DEFAULT_ZK_SESSION_TIMEOUT);
-    int zkRetry = conf.getInt("zookeeper.recovery.retry", 30);
-    int zkRetryIntervalMs = conf.getInt("zookeeper.recovery.retry.intervalmill", 1000);
-    this.zk = CuratorFrameworkFactory.builder()
-        .connectString(ZKConfig.getZKQuorumServersString(conf)).sessionTimeoutMs(zkSessionTimeout)
-        .retryPolicy(new RetryNTimes(zkRetry, zkRetryIntervalMs))
-        .threadFactory(
-          Threads.newDaemonThreadFactory(String.format("ZKClusterRegistry-0x%08x", hashCode())))
-        .build();
-    this.zk.start();
-    // TODO: temporary workaround for HBASE-19312, must be removed before 2.0.0 release!
-    try {
-      this.zk.blockUntilConnected(2, TimeUnit.SECONDS);
-    } catch (InterruptedException e) {
-      return;
-    }
+    this.zk = new ReadOnlyZKClient(conf);
   }
 
-  private interface CuratorEventProcessor<T> {
-    T process(CuratorEvent event) throws Exception;
+  private interface Converter<T> {
+    T convert(byte[] data) throws Exception;
   }
 
-  private static <T> CompletableFuture<T> exec(BackgroundPathable<?> opBuilder, String path,
-      CuratorEventProcessor<T> processor) {
+  private <T> CompletableFuture<T> getAndConvert(String path, Converter<T> converter) {
     CompletableFuture<T> future = new CompletableFuture<>();
-    try {
-      opBuilder.inBackground((client, event) -> {
-        try {
-          future.complete(processor.process(event));
-        } catch (Exception e) {
-          future.completeExceptionally(e);
-        }
-      }).withUnhandledErrorListener((msg, e) -> future.completeExceptionally(e)).forPath(path);
-    } catch (Exception e) {
-      future.completeExceptionally(e);
-    }
+    zk.get(path).whenComplete((data, error) -> {
+      if (error != null) {
+        future.completeExceptionally(error);
+        return;
+      }
+      try {
+        future.complete(converter.convert(data));
+      } catch (Exception e) {
+        future.completeExceptionally(e);
+      }
+    });
     return future;
   }
 
-  private static String getClusterId(CuratorEvent event) throws DeserializationException {
-    byte[] data = event.getData();
+  private static String getClusterId(byte[] data) throws DeserializationException {
     if (data == null || data.length == 0) {
       return null;
     }
@@ -120,17 +93,15 @@ class ZKAsyncRegistry implements AsyncRegistry {
 
   @Override
   public CompletableFuture<String> getClusterId() {
-    return exec(zk.getData(), znodePaths.clusterIdZNode, ZKAsyncRegistry::getClusterId);
+    return getAndConvert(znodePaths.clusterIdZNode, ZKAsyncRegistry::getClusterId);
   }
 
   @VisibleForTesting
-  CuratorFramework getCuratorFramework() {
+  ReadOnlyZKClient getZKClient() {
     return zk;
   }
 
-  private static ZooKeeperProtos.MetaRegionServer getMetaProto(CuratorEvent event)
-      throws IOException {
-    byte[] data = event.getData();
+  private static ZooKeeperProtos.MetaRegionServer getMetaProto(byte[] data) throws IOException {
     if (data == null || data.length == 0) {
       return null;
     }
@@ -169,7 +140,7 @@ class ZKAsyncRegistry implements AsyncRegistry {
     MutableInt remaining = new MutableInt(locs.length);
     znodePaths.metaReplicaZNodes.forEach((replicaId, path) -> {
       if (replicaId == DEFAULT_REPLICA_ID) {
-        exec(zk.getData(), path, ZKAsyncRegistry::getMetaProto).whenComplete((proto, error) -> {
+        getAndConvert(path, ZKAsyncRegistry::getMetaProto).whenComplete((proto, error) -> {
           if (error != null) {
             future.completeExceptionally(error);
             return;
@@ -184,13 +155,13 @@ class ZKAsyncRegistry implements AsyncRegistry {
               new IOException("Meta region is in state " + stateAndServerName.getFirst()));
             return;
           }
-          locs[DEFAULT_REPLICA_ID] = new HRegionLocation(
-              getRegionInfoForDefaultReplica(FIRST_META_REGIONINFO),
-              stateAndServerName.getSecond());
+          locs[DEFAULT_REPLICA_ID] =
+              new HRegionLocation(getRegionInfoForDefaultReplica(FIRST_META_REGIONINFO),
+                  stateAndServerName.getSecond());
           tryComplete(remaining, locs, future);
         });
       } else {
-        exec(zk.getData(), path, ZKAsyncRegistry::getMetaProto).whenComplete((proto, error) -> {
+        getAndConvert(path, ZKAsyncRegistry::getMetaProto).whenComplete((proto, error) -> {
           if (future.isDone()) {
             return;
           }
@@ -203,13 +174,13 @@ class ZKAsyncRegistry implements AsyncRegistry {
           } else {
             Pair<RegionState.State, ServerName> stateAndServerName = getStateAndServerName(proto);
             if (stateAndServerName.getFirst() != RegionState.State.OPEN) {
-              LOG.warn("Meta region for replica " + replicaId + " is in state "
-                  + stateAndServerName.getFirst());
+              LOG.warn("Meta region for replica " + replicaId + " is in state " +
+                  stateAndServerName.getFirst());
               locs[replicaId] = null;
             } else {
-              locs[replicaId] = new HRegionLocation(
-                  getRegionInfoForReplica(FIRST_META_REGIONINFO, replicaId),
-                  stateAndServerName.getSecond());
+              locs[replicaId] =
+                  new HRegionLocation(getRegionInfoForReplica(FIRST_META_REGIONINFO, replicaId),
+                      stateAndServerName.getSecond());
             }
           }
           tryComplete(remaining, locs, future);
@@ -219,18 +190,12 @@ class ZKAsyncRegistry implements AsyncRegistry {
     return future;
   }
 
-  private static int getCurrentNrHRS(CuratorEvent event) {
-    Stat stat = event.getStat();
-    return stat != null ? stat.getNumChildren() : 0;
-  }
-
   @Override
   public CompletableFuture<Integer> getCurrentNrHRS() {
-    return exec(zk.checkExists(), znodePaths.rsZNode, ZKAsyncRegistry::getCurrentNrHRS);
+    return zk.exists(znodePaths.rsZNode).thenApply(s -> s != null ? s.getNumChildren() : 0);
   }
 
-  private static ZooKeeperProtos.Master getMasterProto(CuratorEvent event) throws IOException {
-    byte[] data = event.getData();
+  private static ZooKeeperProtos.Master getMasterProto(byte[] data) throws IOException {
     if (data == null || data.length == 0) {
       return null;
     }
@@ -241,7 +206,7 @@ class ZKAsyncRegistry implements AsyncRegistry {
 
   @Override
   public CompletableFuture<ServerName> getMasterAddress() {
-    return exec(zk.getData(), znodePaths.masterAddressZNode, ZKAsyncRegistry::getMasterProto)
+    return getAndConvert(znodePaths.masterAddressZNode, ZKAsyncRegistry::getMasterProto)
         .thenApply(proto -> {
           if (proto == null) {
             return null;
@@ -254,7 +219,7 @@ class ZKAsyncRegistry implements AsyncRegistry {
 
   @Override
   public CompletableFuture<Integer> getMasterInfoPort() {
-    return exec(zk.getData(), znodePaths.masterAddressZNode, ZKAsyncRegistry::getMasterProto)
+    return getAndConvert(znodePaths.masterAddressZNode, ZKAsyncRegistry::getMasterProto)
         .thenApply(proto -> proto != null ? proto.getInfoPort() : 0);
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/7a5b0783/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ReadOnlyZKClient.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ReadOnlyZKClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ReadOnlyZKClient.java
new file mode 100644
index 0000000..965a243
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ReadOnlyZKClient.java
@@ -0,0 +1,347 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.zookeeper;
+
+import static org.apache.hadoop.hbase.HConstants.DEFAULT_ZK_SESSION_TIMEOUT;
+import static org.apache.hadoop.hbase.HConstants.ZK_SESSION_TIMEOUT;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.EnumSet;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.DelayQueue;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.Code;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+
+import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
+
+/**
+ * A very simple read only zookeeper implementation without watcher support.
+ */
+@InterfaceAudience.Private
+public final class ReadOnlyZKClient implements Closeable {
+
+  private static final Log LOG = LogFactory.getLog(ReadOnlyZKClient.class);
+
+  public static final String RECOVERY_RETRY = "zookeeper.recovery.retry";
+
+  private static final int DEFAULT_RECOVERY_RETRY = 30;
+
+  public static final String RECOVERY_RETRY_INTERVAL_MILLIS =
+      "zookeeper.recovery.retry.intervalmill";
+
+  private static final int DEFAULT_RECOVERY_RETRY_INTERVAL_MILLIS = 1000;
+
+  public static final String KEEPALIVE_MILLIS = "zookeeper.keep-alive.time";
+
+  private static final int DEFAULT_KEEPALIVE_MILLIS = 60000;
+
+  private static final EnumSet<Code> FAIL_FAST_CODES = EnumSet.of(Code.NOAUTH, Code.AUTHFAILED);
+
+  private final String connectString;
+
+  private final int sessionTimeoutMs;
+
+  private final int maxRetries;
+
+  private final int retryIntervalMs;
+
+  private final int keepAliveTimeMs;
+
+  private static abstract class Task implements Delayed {
+
+    protected long time = System.nanoTime();
+
+    public boolean needZk() {
+      return false;
+    }
+
+    public void exec(ZooKeeper zk) {
+    }
+
+    public void connectFailed(IOException e) {
+    }
+
+    public void closed(IOException e) {
+    }
+
+    @Override
+    public int compareTo(Delayed o) {
+      Task that = (Task) o;
+      int c = Long.compare(time, that.time);
+      if (c != 0) {
+        return c;
+      }
+      return Integer.compare(System.identityHashCode(this), System.identityHashCode(that));
+    }
+
+    @Override
+    public long getDelay(TimeUnit unit) {
+      return unit.convert(time - System.nanoTime(), TimeUnit.NANOSECONDS);
+    }
+  }
+
+  private static final Task CLOSE = new Task() {
+  };
+
+  private final DelayQueue<Task> tasks = new DelayQueue<>();
+
+  private final AtomicBoolean closed = new AtomicBoolean(false);
+
+  private ZooKeeper zookeeper;
+
+  private String getId() {
+    return String.format("0x%08x", System.identityHashCode(this));
+  }
+
+  public ReadOnlyZKClient(Configuration conf) {
+    this.connectString = ZKConfig.getZKQuorumServersString(conf);
+    this.sessionTimeoutMs = conf.getInt(ZK_SESSION_TIMEOUT, DEFAULT_ZK_SESSION_TIMEOUT);
+    this.maxRetries = conf.getInt(RECOVERY_RETRY, DEFAULT_RECOVERY_RETRY);
+    this.retryIntervalMs =
+        conf.getInt(RECOVERY_RETRY_INTERVAL_MILLIS, DEFAULT_RECOVERY_RETRY_INTERVAL_MILLIS);
+    this.keepAliveTimeMs = conf.getInt(KEEPALIVE_MILLIS, DEFAULT_KEEPALIVE_MILLIS);
+    LOG.info("Start read only zookeeper connection " + getId() + " to " + connectString +
+        ", session timeout " + sessionTimeoutMs + " ms, retries " + maxRetries +
+        ", retry interval " + retryIntervalMs + " ms, keep alive " + keepAliveTimeMs + " ms");
+    Thread t = new Thread(this::run, "ReadOnlyZKClient");
+    t.setDaemon(true);
+    t.start();
+  }
+
+  private abstract class ZKTask<T> extends Task {
+
+    protected final String path;
+
+    private final CompletableFuture<T> future;
+
+    private final String operationType;
+
+    private int retries;
+
+    protected ZKTask(String path, CompletableFuture<T> future, String operationType) {
+      this.path = path;
+      this.future = future;
+      this.operationType = operationType;
+    }
+
+    protected final void onComplete(ZooKeeper zk, int rc, T ret, boolean errorIfNoNode) {
+      tasks.add(new Task() {
+
+        @Override
+        public void exec(ZooKeeper alwaysNull) {
+          Code code = Code.get(rc);
+          if (code == Code.OK) {
+            future.complete(ret);
+          } else if (code == Code.NONODE) {
+            if (errorIfNoNode) {
+              future.completeExceptionally(KeeperException.create(code, path));
+            } else {
+              future.complete(ret);
+            }
+          } else if (FAIL_FAST_CODES.contains(code)) {
+            future.completeExceptionally(KeeperException.create(code, path));
+          } else {
+            if (code == Code.SESSIONEXPIRED) {
+              LOG.warn(getId() + " session expired, close and reconnect");
+              try {
+                zk.close();
+              } catch (InterruptedException e) {
+              }
+            }
+            if (ZKTask.this.delay(retryIntervalMs, maxRetries)) {
+              LOG.warn(getId() + " failed for " + operationType + " of " + path + ", code = " +
+                  code + ", retries = " + ZKTask.this.retries);
+              tasks.add(ZKTask.this);
+            } else {
+              LOG.warn(getId() + " failed for " + operationType + " of " + path + ", code = " +
+                  code + ", retries = " + ZKTask.this.retries + ", give up");
+              future.completeExceptionally(KeeperException.create(code, path));
+            }
+          }
+        }
+
+        @Override
+        public void closed(IOException e) {
+          // It may happen that a request is succeeded and the onComplete has been called and pushed
+          // us into the task queue, but before we get called a close is called and here we will
+          // fail the request, although it is succeeded actually.
+          // This is not a perfect solution but anyway, it is better than hang the requests for
+          // ever, and also acceptable as if you close the zk client before actually getting the
+          // response then a failure is always possible.
+          future.completeExceptionally(e);
+        }
+      });
+    }
+
+    @Override
+    public boolean needZk() {
+      return true;
+    }
+
+    public boolean delay(long intervalMs, int maxRetries) {
+      if (retries >= maxRetries) {
+        return false;
+      }
+      retries++;
+      time = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(intervalMs);
+      return true;
+    }
+
+    @Override
+    public void connectFailed(IOException e) {
+      if (delay(retryIntervalMs, maxRetries)) {
+        LOG.warn(getId() + " failed to connect to zk for " + operationType + " of " + path +
+            ", retries = " + retries,
+          e);
+        tasks.add(this);
+      } else {
+        LOG.warn(getId() + " failed to connect to zk for " + operationType + " of " + path +
+            ", retries = " + retries + ", give up",
+          e);
+        future.completeExceptionally(e);
+      }
+    }
+
+    @Override
+    public void closed(IOException e) {
+      future.completeExceptionally(e);
+    }
+  }
+
+  private static <T> CompletableFuture<T> failed(Throwable e) {
+    CompletableFuture<T> future = new CompletableFuture<>();
+    future.completeExceptionally(e);
+    return future;
+  }
+
+  public CompletableFuture<byte[]> get(String path) {
+    if (closed.get()) {
+      return failed(new IOException("Client already closed"));
+    }
+    CompletableFuture<byte[]> future = new CompletableFuture<>();
+    tasks.add(new ZKTask<byte[]>(path, future, "get") {
+
+      @Override
+      public void exec(ZooKeeper zk) {
+        zk.getData(path, false, (rc, path, ctx, data, stat) -> onComplete(zk, rc, data, true),
+          null);
+      }
+    });
+    return future;
+  }
+
+  public CompletableFuture<Stat> exists(String path) {
+    if (closed.get()) {
+      return failed(new IOException("Client already closed"));
+    }
+    CompletableFuture<Stat> future = new CompletableFuture<>();
+    tasks.add(new ZKTask<Stat>(path, future, "exists") {
+
+      @Override
+      public void exec(ZooKeeper zk) {
+        zk.exists(path, false, (rc, path, ctx, stat) -> onComplete(zk, rc, stat, false), null);
+      }
+    });
+    return future;
+  }
+
+  private void closeZk() {
+    if (zookeeper != null) {
+      try {
+        zookeeper.close();
+      } catch (InterruptedException e) {
+      }
+      zookeeper = null;
+    }
+  }
+
+  private ZooKeeper getZk() throws IOException {
+    // may be closed when session expired
+    if (zookeeper == null || !zookeeper.getState().isAlive()) {
+      zookeeper = new ZooKeeper(connectString, sessionTimeoutMs, e -> {
+      });
+    }
+    return zookeeper;
+  }
+
+  private void run() {
+    for (;;) {
+      Task task;
+      try {
+        task = tasks.poll(keepAliveTimeMs, TimeUnit.MILLISECONDS);
+      } catch (InterruptedException e) {
+        continue;
+      }
+      if (task == CLOSE) {
+        break;
+      }
+      if (task == null) {
+        LOG.info(getId() + " no activities for " + keepAliveTimeMs +
+            " ms, close active connection. Will reconnect next time when there are new requests.");
+        closeZk();
+        continue;
+      }
+      if (!task.needZk()) {
+        task.exec(null);
+      } else {
+        ZooKeeper zk;
+        try {
+          zk = getZk();
+        } catch (IOException e) {
+          task.connectFailed(e);
+          continue;
+        }
+        task.exec(zk);
+      }
+    }
+    closeZk();
+    IOException error = new IOException("Client already closed");
+    Arrays.stream(tasks.toArray(new Task[0])).forEach(t -> t.closed(error));
+    tasks.clear();
+  }
+
+  @Override
+  public void close() {
+    if (closed.compareAndSet(false, true)) {
+      LOG.info("Close zookeeper connection " + getId() + " to " + connectString);
+      tasks.add(CLOSE);
+    }
+  }
+
+  @VisibleForTesting
+  ZooKeeper getZooKeeper() {
+    return zookeeper;
+  }
+
+  @VisibleForTesting
+  public String getConnectString() {
+    return connectString;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/7a5b0783/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestZKAsyncRegistry.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestZKAsyncRegistry.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestZKAsyncRegistry.java
index a8a7de0..0ca8e73 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestZKAsyncRegistry.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestZKAsyncRegistry.java
@@ -22,7 +22,6 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNotSame;
-import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
 import java.util.concurrent.ExecutionException;
@@ -30,7 +29,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.IntStream;
 
 import org.apache.commons.io.IOUtils;
-import org.apache.curator.CuratorZookeeperClient;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
@@ -38,14 +36,15 @@ import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.RegionLocations;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
-import org.apache.hadoop.hbase.testclassification.ClientTests;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.ZKTests;
+import org.apache.hadoop.hbase.zookeeper.ReadOnlyZKClient;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-@Category({ MediumTests.class, ClientTests.class })
+@Category({ MediumTests.class, ZKTests.class })
 public class TestZKAsyncRegistry {
 
   private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
@@ -54,33 +53,34 @@ public class TestZKAsyncRegistry {
 
   // waits for all replicas to have region location
   static void waitUntilAllReplicasHavingRegionLocation(TableName tbl) throws IOException {
-    TEST_UTIL.waitFor(TEST_UTIL.getConfiguration()
-        .getLong("hbase.client.sync.wait.timeout.msec", 60000),
-        200, true, new ExplainingPredicate<IOException>() {
-      @Override
-      public String explainFailure() throws IOException {
-        return TEST_UTIL.explainTableAvailability(tbl);
-      }
+    TEST_UTIL.waitFor(
+      TEST_UTIL.getConfiguration().getLong("hbase.client.sync.wait.timeout.msec", 60000), 200, true,
+      new ExplainingPredicate<IOException>() {
+        @Override
+        public String explainFailure() throws IOException {
+          return TEST_UTIL.explainTableAvailability(tbl);
+        }
 
-      @Override
-      public boolean evaluate() throws IOException {
-        AtomicBoolean ready = new AtomicBoolean(true);
-        try {
-          RegionLocations locs = REGISTRY.getMetaRegionLocation().get();
-          assertEquals(3, locs.getRegionLocations().length);
-          IntStream.range(0, 3).forEach(i -> {
-            HRegionLocation loc = locs.getRegionLocation(i);
-            if (loc == null) {
-              ready.set(false);
-            }
-          });
-        } catch (Exception e) {
-          ready.set(false);
+        @Override
+        public boolean evaluate() throws IOException {
+          AtomicBoolean ready = new AtomicBoolean(true);
+          try {
+            RegionLocations locs = REGISTRY.getMetaRegionLocation().get();
+            assertEquals(3, locs.getRegionLocations().length);
+            IntStream.range(0, 3).forEach(i -> {
+              HRegionLocation loc = locs.getRegionLocation(i);
+              if (loc == null) {
+                ready.set(false);
+              }
+            });
+          } catch (Exception e) {
+            ready.set(false);
+          }
+          return ready.get();
         }
-        return ready.get();
-      }
-    });
+      });
   }
+
   @BeforeClass
   public static void setUp() throws Exception {
     TEST_UTIL.getConfiguration().setInt(META_REPLICAS_NUM, 3);
@@ -109,25 +109,26 @@ public class TestZKAsyncRegistry {
     IntStream.range(0, 3).forEach(i -> {
       HRegionLocation loc = locs.getRegionLocation(i);
       assertNotNull("Replica " + i + " doesn't have location", loc);
-      assertTrue(loc.getRegionInfo().getTable().equals(TableName.META_TABLE_NAME));
-      assertEquals(i, loc.getRegionInfo().getReplicaId());
+      assertEquals(TableName.META_TABLE_NAME, loc.getRegion().getTable());
+      assertEquals(i, loc.getRegion().getReplicaId());
     });
   }
 
   @Test
   public void testIndependentZKConnections() throws IOException {
-    final CuratorZookeeperClient zk1 = REGISTRY.getCuratorFramework().getZookeeperClient();
+    ReadOnlyZKClient zk1 = REGISTRY.getZKClient();
 
-    final Configuration otherConf = new Configuration(TEST_UTIL.getConfiguration());
+    Configuration otherConf = new Configuration(TEST_UTIL.getConfiguration());
     otherConf.set(HConstants.ZOOKEEPER_QUORUM, "127.0.0.1");
-    try (final ZKAsyncRegistry otherRegistry = new ZKAsyncRegistry(otherConf)) {
-      final CuratorZookeeperClient zk2 = otherRegistry.getCuratorFramework().getZookeeperClient();
+    try (ZKAsyncRegistry otherRegistry = new ZKAsyncRegistry(otherConf)) {
+      ReadOnlyZKClient zk2 = otherRegistry.getZKClient();
 
       assertNotSame("Using a different configuration / quorum should result in different backing " +
-          "zk connection.", zk1, zk2);
-      assertNotEquals("Using a different configrution / quorum should be reflected in the " +
-          "zk connection.", zk1.getCurrentConnectionString(), zk2.getCurrentConnectionString());
+          "zk connection.",
+        zk1, zk2);
+      assertNotEquals(
+        "Using a different configrution / quorum should be reflected in the " + "zk connection.",
+        zk1.getConnectString(), zk2.getConnectString());
     }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/7a5b0783/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/MiniZooKeeperCluster.java
----------------------------------------------------------------------
diff --git a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/MiniZooKeeperCluster.java b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/MiniZooKeeperCluster.java
index ef643bf..cb8681c 100644
--- a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/MiniZooKeeperCluster.java
+++ b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/MiniZooKeeperCluster.java
@@ -469,4 +469,8 @@ public class MiniZooKeeperCluster {
     return activeZKServerIndex < 0 || activeZKServerIndex >= clientPortList.size() ? -1
         : clientPortList.get(activeZKServerIndex);
   }
+
+  List<ZooKeeperServer> getZooKeeperServers() {
+    return zooKeeperServers;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/7a5b0783/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKMainServer.java
----------------------------------------------------------------------
diff --git a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKMainServer.java b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKMainServer.java
index 9cb0e7d..2db83eb 100644
--- a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKMainServer.java
+++ b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKMainServer.java
@@ -1,5 +1,4 @@
-/*
- *
+/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -22,7 +21,6 @@ package org.apache.hadoop.hbase.zookeeper;
 import java.io.IOException;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.curator.shaded.com.google.common.base.Stopwatch;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
@@ -30,6 +28,8 @@ import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooKeeperMain;
 
+import org.apache.hadoop.hbase.shaded.com.google.common.base.Stopwatch;
+
 /**
  * Tool for running ZookeeperMain from HBase by  reading a ZooKeeper server
  * from HBase XML configuration.

http://git-wip-us.apache.org/repos/asf/hbase/blob/7a5b0783/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestInstancePending.java
----------------------------------------------------------------------
diff --git a/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestInstancePending.java b/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestInstancePending.java
index e67c9fd..5c984a5 100644
--- a/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestInstancePending.java
+++ b/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestInstancePending.java
@@ -1,4 +1,4 @@
-/*
+/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -15,17 +15,17 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.hadoop.hbase.zookeeper;
 
 import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.testclassification.ZKTests;
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-@Category(SmallTests.class)
+@Category({ ZKTests.class, SmallTests.class })
 public class TestInstancePending {
   @Test(timeout = 1000)
   public void test() throws Exception {

http://git-wip-us.apache.org/repos/asf/hbase/blob/7a5b0783/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestReadOnlyZKClient.java
----------------------------------------------------------------------
diff --git a/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestReadOnlyZKClient.java b/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestReadOnlyZKClient.java
new file mode 100644
index 0000000..765ddf9
--- /dev/null
+++ b/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestReadOnlyZKClient.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.zookeeper;
+
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ThreadLocalRandom;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.ZKTests;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.Code;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ ZKTests.class, MediumTests.class })
+public class TestReadOnlyZKClient {
+
+  private static HBaseCommonTestingUtility UTIL = new HBaseCommonTestingUtility();
+
+  private static MiniZooKeeperCluster CLUSTER;
+
+  private static int PORT;
+
+  private static String PATH = "/test";
+
+  private static byte[] DATA;
+
+  private static int CHILDREN = 5;
+
+  private static ReadOnlyZKClient RO_ZK;
+
+  @BeforeClass
+  public static void setUp() throws IOException, InterruptedException, KeeperException {
+    File file =
+        new File(UTIL.getDataTestDir("zkcluster_" + UUID.randomUUID().toString()).toString());
+    CLUSTER = new MiniZooKeeperCluster(UTIL.getConfiguration());
+    PORT = CLUSTER.startup(file);
+    ZooKeeper zk = new ZooKeeper("localhost:" + PORT, 10000, e -> {
+    });
+    DATA = new byte[10];
+    ThreadLocalRandom.current().nextBytes(DATA);
+    zk.create(PATH, DATA, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+    for (int i = 0; i < CHILDREN; i++) {
+      zk.create(PATH + "/c" + i, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+    }
+    zk.close();
+    Configuration conf = UTIL.getConfiguration();
+    conf.set(HConstants.ZOOKEEPER_QUORUM, "localhost:" + PORT);
+    conf.setInt(ReadOnlyZKClient.RECOVERY_RETRY, 3);
+    conf.setInt(ReadOnlyZKClient.RECOVERY_RETRY_INTERVAL_MILLIS, 100);
+    conf.setInt(ReadOnlyZKClient.KEEPALIVE_MILLIS, 3000);
+    RO_ZK = new ReadOnlyZKClient(conf);
+    // only connect when necessary
+    assertNull(RO_ZK.getZooKeeper());
+  }
+
+  @AfterClass
+  public static void tearDown() throws IOException {
+    RO_ZK.close();
+    CLUSTER.shutdown();
+    UTIL.cleanupTestDir();
+  }
+
+  @Test
+  public void testGetAndExists() throws InterruptedException, ExecutionException {
+    assertArrayEquals(DATA, RO_ZK.get(PATH).get());
+    assertEquals(CHILDREN, RO_ZK.exists(PATH).get().getNumChildren());
+    assertNotNull(RO_ZK.getZooKeeper());
+    // a little longer than keep alive millis
+    Thread.sleep(5000);
+    assertNull(RO_ZK.getZooKeeper());
+  }
+
+  @Test
+  public void testNoNode() throws InterruptedException, ExecutionException {
+    String pathNotExists = PATH + "_whatever";
+    try {
+      RO_ZK.get(pathNotExists).get();
+      fail("should fail because of " + pathNotExists + " does not exist");
+    } catch (ExecutionException e) {
+      assertThat(e.getCause(), instanceOf(KeeperException.class));
+      KeeperException ke = (KeeperException) e.getCause();
+      assertEquals(Code.NONODE, ke.code());
+      assertEquals(pathNotExists, ke.getPath());
+    }
+    // exists will not throw exception.
+    assertNull(RO_ZK.exists(pathNotExists).get());
+  }
+
+  @Test
+  public void testSessionExpire() throws Exception {
+    assertArrayEquals(DATA, RO_ZK.get(PATH).get());
+    ZooKeeper zk = RO_ZK.getZooKeeper();
+    long sessionId = zk.getSessionId();
+    CLUSTER.getZooKeeperServers().get(0).closeSession(sessionId);
+    // should not reach keep alive so still the same instance
+    assertSame(zk, RO_ZK.getZooKeeper());
+
+    assertArrayEquals(DATA, RO_ZK.get(PATH).get());
+    assertNotNull(RO_ZK.getZooKeeper());
+    assertNotSame(zk, RO_ZK.getZooKeeper());
+    assertNotEquals(sessionId, RO_ZK.getZooKeeper().getSessionId());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/7a5b0783/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKMetrics.java
----------------------------------------------------------------------
diff --git a/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKMetrics.java b/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKMetrics.java
index 2811cc5..e43a5c8 100644
--- a/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKMetrics.java
+++ b/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKMetrics.java
@@ -1,5 +1,4 @@
-/*
- *
+/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -23,10 +22,11 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
 import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.testclassification.ZKTests;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-@Category(SmallTests.class)
+@Category({ ZKTests.class, SmallTests.class })
 public class TestZKMetrics {
 
   @Test

http://git-wip-us.apache.org/repos/asf/hbase/blob/7a5b0783/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKUtil.java
----------------------------------------------------------------------
diff --git a/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKUtil.java b/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKUtil.java
index 7006040..db96392 100644
--- a/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKUtil.java
+++ b/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKUtil.java
@@ -1,4 +1,4 @@
-/*
+/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.ZooKeeperConnectionException;
 import org.apache.hadoop.hbase.security.Superusers;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.testclassification.ZKTests;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooDefs.Ids;
@@ -37,10 +38,7 @@ import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.mockito.Mockito;
 
-/**
- *
- */
-@Category({SmallTests.class})
+@Category({ ZKTests.class, SmallTests.class })
 public class TestZKUtil {
 
   @Test

http://git-wip-us.apache.org/repos/asf/hbase/blob/7a5b0783/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKWatcher.java
----------------------------------------------------------------------
diff --git a/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKWatcher.java b/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKWatcher.java
index bd4575d..f3d0b03 100644
--- a/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKWatcher.java
+++ b/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKWatcher.java
@@ -24,10 +24,11 @@ import java.io.IOException;
 
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.testclassification.ZKTests;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-@Category({ SmallTests.class })
+@Category({ ZKTests.class, SmallTests.class })
 public class TestZKWatcher {
 
   @Test

http://git-wip-us.apache.org/repos/asf/hbase/blob/7a5b0783/hbase-zookeeper/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/hbase-zookeeper/src/test/resources/log4j.properties b/hbase-zookeeper/src/test/resources/log4j.properties
new file mode 100644
index 0000000..c322699
--- /dev/null
+++ b/hbase-zookeeper/src/test/resources/log4j.properties
@@ -0,0 +1,68 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Define some default values that can be overridden by system properties
+hbase.root.logger=INFO,console
+hbase.log.dir=.
+hbase.log.file=hbase.log
+
+# Define the root logger to the system property "hbase.root.logger".
+log4j.rootLogger=${hbase.root.logger}
+
+# Logging Threshold
+log4j.threshold=ALL
+
+#
+# Daily Rolling File Appender
+#
+log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.DRFA.File=${hbase.log.dir}/${hbase.log.file}
+
+# Rollver at midnight
+log4j.appender.DRFA.DatePattern=.yyyy-MM-dd
+
+# 30-day backup
+#log4j.appender.DRFA.MaxBackupIndex=30
+log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout
+# Debugging Pattern format
+log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %C{2}(%L): %m%n
+
+
+#
+# console
+# Add "console" to rootlogger above if you want to use this
+#
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %C{2}(%L): %m%n
+
+# Custom Logging levels
+
+#log4j.logger.org.apache.hadoop.fs.FSNamesystem=DEBUG
+
+log4j.logger.org.apache.hadoop=WARN
+log4j.logger.org.apache.zookeeper=ERROR
+log4j.logger.org.apache.hadoop.hbase=DEBUG
+
+#These settings are workarounds against spurious logs from the minicluster.
+#See HBASE-4709
+log4j.logger.org.apache.hadoop.metrics2.impl.MetricsConfig=WARN
+log4j.logger.org.apache.hadoop.metrics2.impl.MetricsSinkAdapter=WARN
+log4j.logger.org.apache.hadoop.metrics2.impl.MetricsSystemImpl=WARN
+log4j.logger.org.apache.hadoop.metrics2.util.MBeans=WARN
+# Enable this to get detailed connection error/retry logging.
+# log4j.logger.org.apache.hadoop.hbase.client.ConnectionImplementation=TRACE

http://git-wip-us.apache.org/repos/asf/hbase/blob/7a5b0783/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 932bc26..53aa115 100755
--- a/pom.xml
+++ b/pom.xml
@@ -3273,6 +3273,21 @@
         <surefire.secondPartGroups></surefire.secondPartGroups>
       </properties>
     </profile>
+    <profile>
+      <id>runZKTests</id>
+      <activation>
+        <activeByDefault>false</activeByDefault>
+      </activation>
+      <properties>
+        <surefire.firstPartForkCount>1</surefire.firstPartForkCount>
+        <surefire.secondPartForkCount>1</surefire.secondPartForkCount>
+        <surefire.skipFirstPart>false</surefire.skipFirstPart>
+        <surefire.skipSecondPart>true</surefire.skipSecondPart>
+        <surefire.firstPartGroups>org.apache.hadoop.hbase.testclassification.ZKTests
+        </surefire.firstPartGroups>
+        <surefire.secondPartGroups></surefire.secondPartGroups>
+      </properties>
+    </profile>
 
     <profile>
       <!-- Use it to launch tests locally-->