You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2018/10/30 01:16:03 UTC

[4/4] helix git commit: Using HelixZkClient to replace ZkClient in helix-core and helix-rest.

Using HelixZkClient to replace ZkClient in helix-core and helix-rest.

1. Replace as much usage as possible. For the raw ZkClient tests, the usages are kept.
2. For backward compatibility, some public interfaces still returns ZkClient. Marks them as Deprecated.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/9d7364d7
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/9d7364d7
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/9d7364d7

Branch: refs/heads/master
Commit: 9d7364d7abba3932a1b25e96e4eb9dd3e203cec9
Parents: 01076ca
Author: Jiajun Wang <jj...@linkedin.com>
Authored: Wed Sep 26 11:39:42 2018 -0700
Committer: Junkai Xue <jx...@linkedin.com>
Committed: Mon Oct 29 18:15:22 2018 -0700

----------------------------------------------------------------------
 .../controller/HierarchicalDataHolder.java      |  10 +-
 .../examples/IdealStateBuilderExample.java      |  12 +-
 .../helix/examples/IdealStateExample.java       |  14 +-
 .../helix/manager/zk/CallbackHandler.java       |  10 +-
 .../helix/manager/zk/ParticipantManager.java    |   2 +-
 .../apache/helix/manager/zk/ZKHelixManager.java |  12 +-
 .../helix/manager/zk/zookeeper/ZkClient.java    |   4 +
 .../java/org/apache/helix/task/TaskDriver.java  |   8 +-
 .../tools/ClusterExternalViewVerifier.java      |   4 +-
 .../helix/tools/ClusterLiveNodesVerifier.java   |   6 +-
 .../org/apache/helix/tools/ClusterSetup.java    |  13 +-
 .../helix/tools/ClusterStateVerifier.java       |  33 ++-
 .../org/apache/helix/tools/ClusterVerifier.java |   6 +-
 .../BestPossibleExternalViewVerifier.java       |  13 +-
 .../ClusterLiveNodesVerifier.java               |   6 +-
 .../StrictMatchExternalViewVerifier.java        |  14 +-
 .../ZkHelixClusterVerifier.java                 |  19 +-
 .../org/apache/helix/tools/TestExecutor.java    |  13 +-
 .../tools/commandtools/IntegrationTestUtil.java |  19 +-
 .../org/apache/helix/util/ZKClientPool.java     |   2 +-
 .../test/java/org/apache/helix/TestHelper.java  |  33 ++-
 .../apache/helix/TestHierarchicalDataStore.java |  24 +-
 .../java/org/apache/helix/TestZKCallback.java   |   2 -
 .../test/java/org/apache/helix/TestZkBasis.java |  88 ++++++-
 .../java/org/apache/helix/TestZnodeModify.java  |  21 +-
 .../java/org/apache/helix/ZkTestHelper.java     |  68 +++--
 .../org/apache/helix/common/ZkTestBase.java     |  54 ++--
 .../TestCorrectnessOnConnectivityLoss.java      |  18 +-
 .../apache/helix/integration/TestDriver.java    |  84 +++---
 .../integration/TestEnableCompression.java      |  15 +-
 .../integration/TestEntropyFreeNodeBounce.java  |   2 +-
 .../helix/integration/TestPauseSignal.java      |   6 +-
 .../integration/TestResourceGroupEndtoEnd.java  |   6 +-
 .../helix/integration/TestStatusUpdate.java     |   7 +-
 .../integration/TestZkCallbackHandlerLeak.java  |   4 +-
 .../helix/integration/TestZkConnectionLost.java |  16 +-
 .../manager/ClusterControllerManager.java       |   6 +-
 .../manager/ClusterDistributedController.java   |   6 +-
 .../manager/MockParticipantManager.java         |   6 +-
 .../integration/manager/ZkTestManager.java      |   4 +-
 .../rebalancer/TestAutoRebalance.java           |   7 +-
 .../TestAutoRebalancePartitionLimit.java        |   8 +-
 .../TestCustomizedIdealStateRebalancer.java     |   8 +-
 .../rebalancer/TestFullAutoNodeTagging.java     |  15 +-
 .../helix/manager/zk/TestZNRecordSizeLimit.java | 256 ++++++++++---------
 .../zk/TestZkCacheAsyncOpSingleThread.java      |   5 +-
 .../zk/TestZkCacheSyncOpSingleThread.java       |   8 +-
 .../apache/helix/manager/zk/TestZkFlapping.java |  82 +-----
 .../helix/manager/zk/TestZkReconnect.java       |  23 +-
 .../org/apache/helix/mock/MockZkClient.java     |   3 +-
 .../helix/mock/controller/MockController.java   |  10 +-
 .../helix/participant/MockZKHelixManager.java   |   4 +-
 .../store/zk/TestZkHelixPropertyStore.java      |   4 +-
 .../apache/helix/tools/TestClusterSetup.java    |  39 ++-
 .../apache/helix/rest/server/ServerContext.java |  27 +-
 .../rest/server/resources/AbstractResource.java |   9 -
 .../resources/helix/AbstractHelixResource.java  |  10 +-
 .../server/resources/helix/ClusterAccessor.java |   4 +-
 .../resources/helix/ResourceAccessor.java       |   8 +-
 .../helix/rest/server/AbstractTestClass.java    |  21 +-
 60 files changed, 663 insertions(+), 578 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/9d7364d7/helix-core/src/main/java/org/apache/helix/controller/HierarchicalDataHolder.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/HierarchicalDataHolder.java b/helix-core/src/main/java/org/apache/helix/controller/HierarchicalDataHolder.java
index 3f3d999..27ecc40 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/HierarchicalDataHolder.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/HierarchicalDataHolder.java
@@ -26,7 +26,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
-import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.manager.zk.client.HelixZkClient;
 import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -42,16 +42,16 @@ public class HierarchicalDataHolder<T> {
    * currentVersion, gets updated when data is read from original source
    */
   AtomicLong currentVersion;
-  private final ZkClient _zkClient;
+  private final HelixZkClient _zkClient;
   private final String _rootPath;
   private final FileFilter _filter;
 
-  public HierarchicalDataHolder(ZkClient client, String rootPath, FileFilter filter) {
+  public HierarchicalDataHolder(HelixZkClient client, String rootPath, FileFilter filter) {
     this._zkClient = client;
     this._rootPath = rootPath;
     this._filter = filter;
     // Node<T> initialValue = new Node<T>();
-    root = new AtomicReference<HierarchicalDataHolder.Node<T>>();
+    root = new AtomicReference<>();
     currentVersion = new AtomicLong(1);
     refreshData();
   }
@@ -99,7 +99,7 @@ public class HierarchicalDataHolder<T> {
           Node<T> oldChild =
               (oldRoot != null && oldRoot.children != null) ? oldRoot.children.get(child) : null;
           if (newRoot.children == null) {
-            newRoot.children = new ConcurrentHashMap<String, HierarchicalDataHolder.Node<T>>();
+            newRoot.children = new ConcurrentHashMap<>();
           }
           if (!newRoot.children.contains(child)) {
             newRoot.children.put(child, new Node<T>());

http://git-wip-us.apache.org/repos/asf/helix/blob/9d7364d7/helix-core/src/main/java/org/apache/helix/examples/IdealStateBuilderExample.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/examples/IdealStateBuilderExample.java b/helix-core/src/main/java/org/apache/helix/examples/IdealStateBuilderExample.java
index b89d830..71e6662 100644
--- a/helix-core/src/main/java/org/apache/helix/examples/IdealStateBuilderExample.java
+++ b/helix-core/src/main/java/org/apache/helix/examples/IdealStateBuilderExample.java
@@ -22,7 +22,8 @@ package org.apache.helix.examples;
 import org.apache.helix.controller.HelixControllerMain;
 import org.apache.helix.manager.zk.ZKHelixAdmin;
 import org.apache.helix.manager.zk.ZNRecordSerializer;
-import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.manager.zk.client.HelixZkClient;
+import org.apache.helix.manager.zk.client.SharedZkClientFactory;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.IdealState.RebalanceMode;
 import org.apache.helix.model.InstanceConfig;
@@ -51,10 +52,11 @@ public class IdealStateBuilderExample {
     final String clusterName = args[1];
     RebalanceMode idealStateMode = RebalanceMode.valueOf(args[2].toUpperCase());
 
-    ZkClient zkclient =
-        new ZkClient(zkAddr, ZkClient.DEFAULT_SESSION_TIMEOUT, ZkClient.DEFAULT_CONNECTION_TIMEOUT,
-            new ZNRecordSerializer());
-    ZKHelixAdmin admin = new ZKHelixAdmin(zkclient);
+    HelixZkClient.ZkClientConfig clientConfig = new HelixZkClient.ZkClientConfig();
+    clientConfig.setZkSerializer(new ZNRecordSerializer());
+    final HelixZkClient zkClient = SharedZkClientFactory.getInstance()
+        .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddr), clientConfig);
+    ZKHelixAdmin admin = new ZKHelixAdmin(zkClient);
 
     // add cluster
     admin.addCluster(clusterName, true);

http://git-wip-us.apache.org/repos/asf/helix/blob/9d7364d7/helix-core/src/main/java/org/apache/helix/examples/IdealStateExample.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/examples/IdealStateExample.java b/helix-core/src/main/java/org/apache/helix/examples/IdealStateExample.java
index 7c5192d..723cbbe 100644
--- a/helix-core/src/main/java/org/apache/helix/examples/IdealStateExample.java
+++ b/helix-core/src/main/java/org/apache/helix/examples/IdealStateExample.java
@@ -22,7 +22,8 @@ package org.apache.helix.examples;
 import org.apache.helix.controller.HelixControllerMain;
 import org.apache.helix.manager.zk.ZKHelixAdmin;
 import org.apache.helix.manager.zk.ZNRecordSerializer;
-import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.manager.zk.client.HelixZkClient;
+import org.apache.helix.manager.zk.client.SharedZkClientFactory;
 import org.apache.helix.model.IdealState.RebalanceMode;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.StateModelDefinition;
@@ -31,7 +32,7 @@ import org.apache.helix.tools.StateModelConfigGenerator;
 /**
  * Ideal state json format file used in this example for CUSTOMIZED ideal state mode
  * <p>
- * 
+ *
  * <pre>
  * {
  * "id" : "TestDB",
@@ -93,10 +94,11 @@ public class IdealStateExample {
     }
 
     // add cluster {clusterName}
-    ZkClient zkclient =
-        new ZkClient(zkAddr, ZkClient.DEFAULT_SESSION_TIMEOUT, ZkClient.DEFAULT_CONNECTION_TIMEOUT,
-            new ZNRecordSerializer());
-    ZKHelixAdmin admin = new ZKHelixAdmin(zkclient);
+    HelixZkClient.ZkClientConfig clientConfig = new HelixZkClient.ZkClientConfig();
+    clientConfig.setZkSerializer(new ZNRecordSerializer());
+    HelixZkClient zkClient = SharedZkClientFactory.getInstance()
+        .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddr), clientConfig);
+    ZKHelixAdmin admin = new ZKHelixAdmin(zkClient);
     admin.addCluster(clusterName, true);
 
     // add MasterSlave state mode definition

http://git-wip-us.apache.org/repos/asf/helix/blob/9d7364d7/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
index b6d452d..9e9d1a7 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
@@ -179,19 +179,11 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener {
    */
   private List<NotificationContext.Type> _expectTypes = nextNotificationType.get(Type.FINALIZE);
 
-  @Deprecated
-  public CallbackHandler(HelixManager manager, ZkClient client, PropertyKey propertyKey,
+  public CallbackHandler(HelixManager manager, HelixZkClient client, PropertyKey propertyKey,
       Object listener, EventType[] eventTypes, ChangeType changeType) {
     this(manager, client, propertyKey, listener, eventTypes, changeType, null);
   }
 
-  @Deprecated
-  public CallbackHandler(HelixManager manager, ZkClient client, PropertyKey propertyKey,
-      Object listener, EventType[] eventTypes, ChangeType changeType,
-      HelixCallbackMonitor monitor) {
-    this(manager, (HelixZkClient) client, propertyKey, listener, eventTypes, changeType, monitor);
-  }
-
   public CallbackHandler(HelixManager manager, HelixZkClient client, PropertyKey propertyKey,
         Object listener, EventType[] eventTypes, ChangeType changeType,
         HelixCallbackMonitor monitor) {

http://git-wip-us.apache.org/repos/asf/helix/blob/9d7364d7/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
index 36fb969..28641e0 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
@@ -77,7 +77,7 @@ public class ParticipantManager {
   final LiveInstanceInfoProvider _liveInstanceInfoProvider;
   final List<PreConnectCallback> _preConnectCallbacks;
 
-  public ParticipantManager(HelixManager manager, ZkClient zkclient, int sessionTimeout,
+  public ParticipantManager(HelixManager manager, HelixZkClient zkclient, int sessionTimeout,
       LiveInstanceInfoProvider liveInstanceInfoProvider, List<PreConnectCallback> preConnectCallbacks) {
     _zkclient = zkclient;
     _manager = manager;

http://git-wip-us.apache.org/repos/asf/helix/blob/9d7364d7/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
index 98e2737..c673f51 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
@@ -715,6 +715,8 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
       _messagingService.getExecutor().shutdown();
 
       // TODO reset user defined handlers only
+      // TODO Fix the issue that when connection disconnected, reset handlers will be blocked. -- JJ
+      // This is because reset logic contains ZK operations.
       resetHandlers();
 
       if (_leaderElectionHandler != null) {
@@ -902,10 +904,10 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
     boolean isConnected;
     do {
       isConnected =
-          _zkclient.waitUntilConnected(ZkClient.DEFAULT_CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS);
+          _zkclient.waitUntilConnected(HelixZkClient.DEFAULT_CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS);
       if (!isConnected) {
         LOG.error("fail to connect zkserver: " + _zkAddress + " in "
-            + ZkClient.DEFAULT_CONNECTION_TIMEOUT + "ms. expiredSessionId: " + _sessionId
+            + HelixZkClient.DEFAULT_CONNECTION_TIMEOUT + "ms. expiredSessionId: " + _sessionId
             + ", clusterName: " + _clusterName);
         continue;
       }
@@ -998,6 +1000,10 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
         }
 
         try {
+          // TODO Call disconnect in another thread.
+          // handleStateChanged is triggered in ZkClient eventThread. The disconnect logic will
+          // interrupt this thread. This issue prevents the ZkClient.close() from complete. So the
+          // client is left in a strange state.
           disconnect();
         } catch (Exception ex) {
           LOG.error("Disconnect HelixManager is not completely done.", ex);
@@ -1094,7 +1100,7 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
       _participantManager.reset();
     }
     _participantManager =
-        new ParticipantManager(this, (ZkClient) _zkclient, _sessionTimeout, _liveInstanceInfoProvider,
+        new ParticipantManager(this, _zkclient, _sessionTimeout, _liveInstanceInfoProvider,
             _preConnectCallbacks);
     _participantManager.handleNewSession();
   }

http://git-wip-us.apache.org/repos/asf/helix/blob/9d7364d7/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java b/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
index 9c18d09..4d2a93d 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
@@ -1491,6 +1491,10 @@ public class ZkClient implements Watcher {
           throw new HelixException(
               "Unable to connect to zookeeper server with the specified ZkConnection");
         }
+        // TODO Refine the init state here. Here we pre-config it to be connected. This may not be
+        // the case, if the connection is connecting or recovering. -- JJ
+        // For shared client, the event notification will not be forwarded before wather add to the
+        // connection manager.
         setCurrentState(KeeperState.SyncConnected);
       }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/9d7364d7/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
index 4fe732b..0225f83 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
@@ -25,7 +25,7 @@ import org.apache.helix.controller.rebalancer.util.RebalanceScheduler;
 import org.apache.helix.manager.zk.ZKHelixAdmin;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.manager.zk.client.HelixZkClient;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.ResourceConfig;
 import org.apache.helix.model.builder.CustomModeISBuilder;
@@ -84,13 +84,13 @@ public class TaskDriver {
         manager.getHelixPropertyStore(), manager.getClusterName());
   }
 
-  public TaskDriver(ZkClient client, String clusterName) {
+  public TaskDriver(HelixZkClient client, String clusterName) {
     this(client, new ZkBaseDataAccessor<ZNRecord>(client), clusterName);
   }
 
-  public TaskDriver(ZkClient client, ZkBaseDataAccessor<ZNRecord> baseAccessor, String clusterName) {
+  public TaskDriver(HelixZkClient client, ZkBaseDataAccessor<ZNRecord> baseAccessor, String clusterName) {
     this(new ZKHelixAdmin(client), new ZKHelixDataAccessor(clusterName, baseAccessor),
-        new ZkHelixPropertyStore<ZNRecord>(baseAccessor,
+        new ZkHelixPropertyStore<>(baseAccessor,
             PropertyPathBuilder.propertyStore(clusterName), null), clusterName);
   }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/9d7364d7/helix-core/src/main/java/org/apache/helix/tools/ClusterExternalViewVerifier.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterExternalViewVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterExternalViewVerifier.java
index af80b48..7129c9f 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterExternalViewVerifier.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterExternalViewVerifier.java
@@ -29,7 +29,7 @@ import org.apache.helix.controller.stages.ClusterEvent;
 import org.apache.helix.controller.stages.ClusterEventType;
 import org.apache.helix.controller.stages.CurrentStateComputationStage;
 import org.apache.helix.controller.stages.ResourceComputationStage;
-import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.manager.zk.client.HelixZkClient;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.Partition;
 import org.slf4j.Logger;
@@ -54,7 +54,7 @@ public class ClusterExternalViewVerifier extends ClusterVerifier {
 
   final List<String> _expectSortedLiveNodes; // always sorted
 
-  public ClusterExternalViewVerifier(ZkClient zkclient, String clusterName,
+  public ClusterExternalViewVerifier(HelixZkClient zkclient, String clusterName,
       List<String> expectLiveNodes) {
     super(zkclient, clusterName);
     _expectSortedLiveNodes = expectLiveNodes;

http://git-wip-us.apache.org/repos/asf/helix/blob/9d7364d7/helix-core/src/main/java/org/apache/helix/tools/ClusterLiveNodesVerifier.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterLiveNodesVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterLiveNodesVerifier.java
index d1187ab..164cfcc 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterLiveNodesVerifier.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterLiveNodesVerifier.java
@@ -19,11 +19,11 @@ package org.apache.helix.tools;
  * under the License.
  */
 
-import org.apache.helix.manager.zk.ZkClient;
-
 import java.util.Collections;
 import java.util.List;
 
+import org.apache.helix.manager.zk.client.HelixZkClient;
+
 /**
  * Please use the class is in tools.ClusterVerifiers.
  */
@@ -32,7 +32,7 @@ public class ClusterLiveNodesVerifier extends ClusterVerifier {
 
   final List<String> _expectSortedLiveNodes; // always sorted
 
-  public ClusterLiveNodesVerifier(ZkClient zkclient, String clusterName,
+  public ClusterLiveNodesVerifier(HelixZkClient zkclient, String clusterName,
       List<String> expectLiveNodes) {
     super(zkclient, clusterName);
     _expectSortedLiveNodes = expectLiveNodes;

http://git-wip-us.apache.org/repos/asf/helix/blob/9d7364d7/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
index 94a5f70..d21877f 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
@@ -45,7 +45,8 @@ import org.apache.helix.manager.zk.ZKHelixAdmin;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZNRecordSerializer;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.manager.zk.client.HelixZkClient;
+import org.apache.helix.manager.zk.client.SharedZkClientFactory;
 import org.apache.helix.model.BuiltInStateModelDefinitions;
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.ClusterConstraints;
@@ -135,22 +136,24 @@ public class ClusterSetup {
 
   static Logger _logger = LoggerFactory.getLogger(ClusterSetup.class);
   String _zkServerAddress;
-  ZkClient _zkClient;
+  HelixZkClient _zkClient;
   HelixAdmin _admin;
 
   public ClusterSetup(String zkServerAddress) {
     _zkServerAddress = zkServerAddress;
-    _zkClient = ZKClientPool.getZkClient(_zkServerAddress);
+    _zkClient = SharedZkClientFactory.getInstance()
+        .buildZkClient(new HelixZkClient.ZkConnectionConfig(_zkServerAddress));
+    _zkClient.setZkSerializer(new ZNRecordSerializer());
     _admin = new ZKHelixAdmin(_zkClient);
   }
 
-  public ClusterSetup(ZkClient zkClient) {
+  public ClusterSetup(HelixZkClient zkClient) {
     _zkServerAddress = zkClient.getServers();
     _zkClient = zkClient;
     _admin = new ZKHelixAdmin(_zkClient);
   }
 
-  public ClusterSetup(ZkClient zkClient, HelixAdmin zkHelixAdmin) {
+  public ClusterSetup(HelixZkClient zkClient, HelixAdmin zkHelixAdmin) {
     _zkServerAddress = zkClient.getServers();
     _zkClient = zkClient;
     _admin = zkHelixAdmin;

http://git-wip-us.apache.org/repos/asf/helix/blob/9d7364d7/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
index cc508ef..cc16ce2 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
@@ -28,6 +28,7 @@ import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
+import com.google.common.collect.Sets;
 import org.I0Itec.zkclient.IZkChildListener;
 import org.I0Itec.zkclient.IZkDataListener;
 import org.I0Itec.zkclient.exception.ZkNodeExistsException;
@@ -57,19 +58,19 @@ import org.apache.helix.controller.stages.ClusterEventType;
 import org.apache.helix.controller.stages.CurrentStateComputationStage;
 import org.apache.helix.controller.stages.ResourceComputationStage;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
 import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.manager.zk.client.DedicatedZkClientFactory;
+import org.apache.helix.manager.zk.client.HelixZkClient;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.Partition;
 import org.apache.helix.model.Resource;
 import org.apache.helix.task.TaskConstants;
-import org.apache.helix.util.ZKClientPool;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.Sets;
-
 /**
  * This class is deprecated, please use dedicated verifier classes, such as BestPossibleExternViewVerifier, etc, in tools.ClusterVerifiers
  */
@@ -98,10 +99,11 @@ public class ClusterStateVerifier {
   @Deprecated
   static class ExtViewVeriferZkListener implements IZkChildListener, IZkDataListener {
     final CountDownLatch _countDown;
-    final ZkClient _zkClient;
+    final HelixZkClient _zkClient;
     final Verifier _verifier;
 
-    public ExtViewVeriferZkListener(CountDownLatch countDown, ZkClient zkClient, ZkVerifier verifier) {
+    public ExtViewVeriferZkListener(CountDownLatch countDown, HelixZkClient zkClient,
+        ZkVerifier verifier) {
       _countDown = countDown;
       _zkClient = zkClient;
       _verifier = verifier;
@@ -136,17 +138,20 @@ public class ClusterStateVerifier {
     }
   }
 
-  private static ZkClient validateAndGetClient(String zkAddr, String clusterName) {
+  private static HelixZkClient validateAndGetClient(String zkAddr, String clusterName) {
     if (zkAddr == null || clusterName == null) {
       throw new IllegalArgumentException("requires zkAddr|clusterName");
     }
-    return ZKClientPool.getZkClient(zkAddr);
+    HelixZkClient.ZkClientConfig clientConfig = new HelixZkClient.ZkClientConfig();
+    clientConfig.setZkSerializer(new ZNRecordSerializer());
+    return DedicatedZkClientFactory.getInstance()
+        .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddr), clientConfig);
   }
 
   public static class BestPossAndExtViewZkVerifier implements ZkVerifier {
     private final String clusterName;
     private final Map<String, Map<String, String>> errStates;
-    private final ZkClient zkClient;
+    private final HelixZkClient zkClient;
     private final Set<String> resources;
 
     public BestPossAndExtViewZkVerifier(String zkAddr, String clusterName) {
@@ -163,7 +168,7 @@ public class ClusterStateVerifier {
       this(validateAndGetClient(zkAddr, clusterName), clusterName, errStates, resources);
     }
 
-    public BestPossAndExtViewZkVerifier(ZkClient zkClient, String clusterName,
+    public BestPossAndExtViewZkVerifier(HelixZkClient zkClient, String clusterName,
         Map<String, Map<String, String>> errStates, Set<String> resources) {
       if (zkClient == null || clusterName == null) {
         throw new IllegalArgumentException("requires zkClient|clusterName");
@@ -406,7 +411,7 @@ public class ClusterStateVerifier {
 
     @Override
     public ZkClient getZkClient() {
-      return zkClient;
+      return (ZkClient) zkClient;
     }
 
     @Override
@@ -425,13 +430,13 @@ public class ClusterStateVerifier {
 
   public static class MasterNbInExtViewVerifier implements ZkVerifier {
     private final String clusterName;
-    private final ZkClient zkClient;
+    private final HelixZkClient zkClient;
 
     public MasterNbInExtViewVerifier(String zkAddr, String clusterName) {
       this(validateAndGetClient(zkAddr, clusterName), clusterName);
     }
 
-    public MasterNbInExtViewVerifier(ZkClient zkClient, String clusterName) {
+    public MasterNbInExtViewVerifier(HelixZkClient zkClient, String clusterName) {
       if (zkClient == null || clusterName == null) {
         throw new IllegalArgumentException("requires zkClient|clusterName");
       }
@@ -454,7 +459,7 @@ public class ClusterStateVerifier {
 
     @Override
     public ZkClient getZkClient() {
-      return zkClient;
+      return (ZkClient) zkClient;
     }
 
     @Override
@@ -555,7 +560,7 @@ public class ClusterStateVerifier {
   public static boolean verifyByZkCallback(ZkVerifier verifier, long timeout) {
     long startTime = System.currentTimeMillis();
     CountDownLatch countDown = new CountDownLatch(1);
-    ZkClient zkClient = verifier.getZkClient();
+    HelixZkClient zkClient = verifier.getZkClient();
     String clusterName = verifier.getClusterName();
 
     // add an ephemeral node to /{clusterName}/CONFIGS/CLUSTER/verify

http://git-wip-us.apache.org/repos/asf/helix/blob/9d7364d7/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifier.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifier.java
index 5697bcf..d6e5a73 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifier.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifier.java
@@ -27,7 +27,7 @@ import org.apache.helix.ZNRecord;
 import org.apache.helix.api.listeners.PreFetch;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.manager.zk.client.HelixZkClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -42,7 +42,7 @@ import java.util.concurrent.TimeUnit;
 public abstract class ClusterVerifier implements IZkChildListener, IZkDataListener {
   private static Logger LOG = LoggerFactory.getLogger(ClusterVerifier.class);
 
-  protected final ZkClient _zkclient;
+  protected final HelixZkClient _zkclient;
   protected final String _clusterName;
   protected final HelixDataAccessor _accessor;
   protected final PropertyKey.Builder _keyBuilder;
@@ -58,7 +58,7 @@ public abstract class ClusterVerifier implements IZkChildListener, IZkDataListen
     }
   }
 
-  public ClusterVerifier(ZkClient zkclient, String clusterName) {
+  public ClusterVerifier(HelixZkClient zkclient, String clusterName) {
     _zkclient = zkclient;
     _clusterName = clusterName;
     _accessor = new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(zkclient));

http://git-wip-us.apache.org/repos/asf/helix/blob/9d7364d7/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
index 6e73df6..3517444 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
@@ -33,6 +33,7 @@ import org.apache.helix.controller.stages.ClusterEventType;
 import org.apache.helix.controller.stages.CurrentStateComputationStage;
 import org.apache.helix.controller.stages.ResourceComputationStage;
 import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.manager.zk.client.HelixZkClient;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.Partition;
@@ -73,7 +74,7 @@ public class BestPossibleExternalViewVerifier extends ZkHelixClusterVerifier {
     _clusterDataCache = new ClusterDataCache();
   }
 
-  public BestPossibleExternalViewVerifier(ZkClient zkClient, String clusterName,
+  public BestPossibleExternalViewVerifier(HelixZkClient zkClient, String clusterName,
       Set<String> resources, Map<String, Map<String, String>> errStates,
       Set<String> expectLiveInstances) {
     super(zkClient, clusterName);
@@ -89,7 +90,7 @@ public class BestPossibleExternalViewVerifier extends ZkHelixClusterVerifier {
     private Set<String> _resources;
     private Set<String> _expectLiveInstances;
     private String _zkAddr;
-    private ZkClient _zkClient;
+    private HelixZkClient _zkClient;
 
     public Builder(String clusterName) {
       _clusterName = clusterName;
@@ -148,11 +149,15 @@ public class BestPossibleExternalViewVerifier extends ZkHelixClusterVerifier {
       return this;
     }
 
-    public ZkClient getZkClient() {
+    public HelixZkClient getHelixZkClient() {
       return _zkClient;
     }
 
-    public Builder setZkClient(ZkClient zkClient) {
+    @Deprecated
+    public ZkClient getZkClient() {
+      return (ZkClient) getHelixZkClient();
+    }
+    public Builder setZkClient(HelixZkClient zkClient) {
       _zkClient = zkClient;
       return this;
     }

http://git-wip-us.apache.org/repos/asf/helix/blob/9d7364d7/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ClusterLiveNodesVerifier.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ClusterLiveNodesVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ClusterLiveNodesVerifier.java
index 2a71566..b4d3862 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ClusterLiveNodesVerifier.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ClusterLiveNodesVerifier.java
@@ -24,16 +24,16 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
-import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.manager.zk.client.HelixZkClient;
 
 public class ClusterLiveNodesVerifier extends ZkHelixClusterVerifier {
 
   final Set<String> _expectLiveNodes;
 
-  public ClusterLiveNodesVerifier(ZkClient zkclient, String clusterName,
+  public ClusterLiveNodesVerifier(HelixZkClient zkclient, String clusterName,
       List<String> expectLiveNodes) {
     super(zkclient, clusterName);
-    _expectLiveNodes = new HashSet<String>(expectLiveNodes);
+    _expectLiveNodes = new HashSet<>(expectLiveNodes);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/helix/blob/9d7364d7/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/StrictMatchExternalViewVerifier.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/StrictMatchExternalViewVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/StrictMatchExternalViewVerifier.java
index a1d12fa..e714789 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/StrictMatchExternalViewVerifier.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/StrictMatchExternalViewVerifier.java
@@ -35,6 +35,7 @@ import org.apache.helix.PropertyKey;
 import org.apache.helix.controller.rebalancer.AbstractRebalancer;
 import org.apache.helix.controller.stages.ClusterDataCache;
 import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.manager.zk.client.HelixZkClient;
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
@@ -63,7 +64,7 @@ public class StrictMatchExternalViewVerifier extends ZkHelixClusterVerifier {
     _expectLiveInstances = expectLiveInstances;
   }
 
-  public StrictMatchExternalViewVerifier(ZkClient zkClient, String clusterName,
+  public StrictMatchExternalViewVerifier(HelixZkClient zkClient, String clusterName,
       Set<String> resources, Set<String> expectLiveInstances) {
     super(zkClient, clusterName);
     _resources = resources;
@@ -75,7 +76,7 @@ public class StrictMatchExternalViewVerifier extends ZkHelixClusterVerifier {
     private Set<String> _resources;
     private Set<String> _expectLiveInstances;
     private String _zkAddr;
-    private ZkClient _zkClient;
+    private HelixZkClient _zkClient;
 
     public StrictMatchExternalViewVerifier build() {
       if (_clusterName == null || (_zkAddr == null && _zkClient == null)) {
@@ -125,11 +126,16 @@ public class StrictMatchExternalViewVerifier extends ZkHelixClusterVerifier {
       return this;
     }
 
-    public ZkClient getZkClient() {
+    public HelixZkClient getHelixZkClient() {
       return _zkClient;
     }
 
-    public Builder setZkClient(ZkClient zkClient) {
+    @Deprecated
+    public ZkClient getZkClient() {
+      return (ZkClient) getHelixZkClient();
+    }
+
+    public Builder setZkClient(HelixZkClient zkClient) {
       _zkClient = zkClient;
       return this;
     }

http://git-wip-us.apache.org/repos/asf/helix/blob/9d7364d7/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ZkHelixClusterVerifier.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ZkHelixClusterVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ZkHelixClusterVerifier.java
index dbf9272..9c24f51 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ZkHelixClusterVerifier.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ZkHelixClusterVerifier.java
@@ -29,9 +29,11 @@ import org.apache.helix.PropertyKey;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.api.listeners.PreFetch;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
 import org.apache.helix.manager.zk.ZkClient;
-import org.apache.helix.util.ZKClientPool;
+import org.apache.helix.manager.zk.client.DedicatedZkClientFactory;
+import org.apache.helix.manager.zk.client.HelixZkClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -46,7 +48,7 @@ public abstract class ZkHelixClusterVerifier
   protected static int DEFAULT_PERIOD = 100;
 
 
-  protected final ZkClient _zkClient;
+  protected final HelixZkClient _zkClient;
   protected final String _clusterName;
   protected final HelixDataAccessor _accessor;
   protected final PropertyKey.Builder _keyBuilder;
@@ -90,7 +92,7 @@ public abstract class ZkHelixClusterVerifier
     }
   }
 
-  public ZkHelixClusterVerifier(ZkClient zkClient, String clusterName) {
+  public ZkHelixClusterVerifier(HelixZkClient zkClient, String clusterName) {
     if (zkClient == null || clusterName == null) {
       throw new IllegalArgumentException("requires zkClient|clusterName");
     }
@@ -104,7 +106,9 @@ public abstract class ZkHelixClusterVerifier
     if (zkAddr == null || clusterName == null) {
       throw new IllegalArgumentException("requires zkAddr|clusterName");
     }
-    _zkClient = ZKClientPool.getZkClient(zkAddr);
+    _zkClient = DedicatedZkClientFactory.getInstance()
+        .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddr));
+    _zkClient.setZkSerializer(new ZNRecordSerializer());
     _clusterName = clusterName;
     _accessor = new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
     _keyBuilder = _accessor.keyBuilder();
@@ -285,10 +289,15 @@ public abstract class ZkHelixClusterVerifier
     }
   }
 
-  public ZkClient getZkClient() {
+  public HelixZkClient getHelixZkClient() {
     return _zkClient;
   }
 
+  @Deprecated
+  public ZkClient getZkClient() {
+    return (ZkClient) getHelixZkClient();
+  }
+
   public String getClusterName() {
     return _clusterName;
   }

http://git-wip-us.apache.org/repos/asf/helix/blob/9d7364d7/helix-core/src/main/java/org/apache/helix/tools/TestExecutor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/TestExecutor.java b/helix-core/src/main/java/org/apache/helix/tools/TestExecutor.java
index 908bba5..6a757f3 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/TestExecutor.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/TestExecutor.java
@@ -34,7 +34,6 @@ import org.I0Itec.zkclient.exception.ZkNodeExistsException;
 import org.apache.helix.HelixManager;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.manager.zk.ZNRecordSerializer;
-import org.apache.helix.manager.zk.ZkClient;
 import org.apache.helix.manager.zk.client.HelixZkClient;
 import org.apache.helix.manager.zk.client.SharedZkClientFactory;
 import org.apache.helix.store.PropertyJsonComparator;
@@ -536,7 +535,7 @@ public class TestExecutor {
     return result;
   }
 
-  private static boolean compareAndSetZnode(ZnodeValue expect, ZnodeOpArg arg, ZkClient zkClient,
+  private static boolean compareAndSetZnode(ZnodeValue expect, ZnodeOpArg arg, HelixZkClient zkClient,
       ZNRecord diff) {
     String path = arg._znodePath;
     ZnodePropertyType type = arg._propertyType;
@@ -639,12 +638,12 @@ public class TestExecutor {
   private static class ExecuteCommand implements Runnable {
     private final TestCommand _command;
     private final long _startTime;
-    private final ZkClient _zkClient;
+    private final HelixZkClient _zkClient;
     private final CountDownLatch _countDown;
     private final Map<TestCommand, Boolean> _testResults;
 
     public ExecuteCommand(long startTime, TestCommand command, CountDownLatch countDown,
-        ZkClient zkClient, Map<TestCommand, Boolean> testResults) {
+        HelixZkClient zkClient, Map<TestCommand, Boolean> testResults) {
       _startTime = startTime;
       _command = command;
       _countDown = countDown;
@@ -735,9 +734,7 @@ public class TestExecutor {
         }
         _countDown.countDown();
         if (_countDown.getCount() == 0) {
-          if (_zkClient != null && _zkClient.getConnection() != null)
-
-          {
+          if (_zkClient != null && !_zkClient.isClosed()) {
             _zkClient.close();
           }
         }
@@ -768,7 +765,7 @@ public class TestExecutor {
 
       TestTrigger trigger = command._trigger;
       command._startTimestamp = System.currentTimeMillis() + trigger._startTime;
-      new Thread(new ExecuteCommand(command._startTimestamp, command, countDown, (ZkClient) zkClient,
+      new Thread(new ExecuteCommand(command._startTimestamp, command, countDown, zkClient,
           testResults)).start();
     }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/9d7364d7/helix-core/src/main/java/org/apache/helix/tools/commandtools/IntegrationTestUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/commandtools/IntegrationTestUtil.java b/helix-core/src/main/java/org/apache/helix/tools/commandtools/IntegrationTestUtil.java
index dc2af8f..18f06f4 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/commandtools/IntegrationTestUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/commandtools/IntegrationTestUtil.java
@@ -35,11 +35,12 @@ import org.apache.commons.cli.ParseException;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.manager.zk.ZNRecordSerializer;
-import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.manager.zk.client.DedicatedZkClientFactory;
+import org.apache.helix.manager.zk.client.HelixZkClient;
+import org.apache.helix.manager.zk.client.SharedZkClientFactory;
 import org.apache.helix.tools.ClusterExternalViewVerifier;
 import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
 import org.apache.helix.tools.ClusterVerifiers.ClusterLiveNodesVerifier;
-import org.apache.helix.tools.ClusterVerifiers.HelixClusterVerifier;
 import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -62,11 +63,11 @@ public class IntegrationTestUtil {
   public static final String readLeader = "readLeader";
   public static final String verifyClusterState = "verifyClusterState";
 
-  final ZkClient _zkclient;
+  final HelixZkClient _zkclient;
   final ZNRecordSerializer _serializer;
   final long _timeoutValue;
 
-  public IntegrationTestUtil(ZkClient zkclient, long timeoutValue) {
+  public IntegrationTestUtil(HelixZkClient zkclient, long timeoutValue) {
     _zkclient = zkclient;
     _timeoutValue = timeoutValue;
     _serializer = new ZNRecordSerializer();
@@ -213,10 +214,10 @@ public class IntegrationTestUtil {
       System.exit(1);
     }
 
-    String zkServer = cmd.getOptionValue(zkSvr);
-    ZkClient zkclient =
-        new ZkClient(zkServer, ZkClient.DEFAULT_SESSION_TIMEOUT,
-            ZkClient.DEFAULT_CONNECTION_TIMEOUT, new ZNRecordSerializer());
+    HelixZkClient.ZkClientConfig clientConfig = new HelixZkClient.ZkClientConfig();
+    clientConfig.setZkSerializer(new ZNRecordSerializer());
+    HelixZkClient zkClient = DedicatedZkClientFactory.getInstance()
+        .buildZkClient(new HelixZkClient.ZkConnectionConfig(cmd.getOptionValue(zkSvr)), clientConfig);
 
     long timeoutValue = DEFAULT_TIMEOUT;
     if (cmd.hasOption(timeout)) {
@@ -229,7 +230,7 @@ public class IntegrationTestUtil {
       }
     }
 
-    IntegrationTestUtil util = new IntegrationTestUtil(zkclient, timeoutValue);
+    IntegrationTestUtil util = new IntegrationTestUtil(zkClient, timeoutValue);
 
     if (cmd != null) {
       if (cmd.hasOption(verifyExternalView)) {

http://git-wip-us.apache.org/repos/asf/helix/blob/9d7364d7/helix-core/src/main/java/org/apache/helix/util/ZKClientPool.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/util/ZKClientPool.java b/helix-core/src/main/java/org/apache/helix/util/ZKClientPool.java
index 0980e48..3350d57 100644
--- a/helix-core/src/main/java/org/apache/helix/util/ZKClientPool.java
+++ b/helix-core/src/main/java/org/apache/helix/util/ZKClientPool.java
@@ -27,7 +27,7 @@ import org.apache.helix.manager.zk.ZkClient;
 import org.apache.zookeeper.ZooKeeper.States;
 
 public class ZKClientPool {
-  static final Map<String, ZkClient> _zkClientMap = new ConcurrentHashMap<String, ZkClient>();
+  static final Map<String, ZkClient> _zkClientMap = new ConcurrentHashMap<>();
   static final int DEFAULT_SESSION_TIMEOUT = 30 * 1000;
 
   public static ZkClient getZkClient(String zkServer) {

http://git-wip-us.apache.org/repos/asf/helix/blob/9d7364d7/helix-core/src/test/java/org/apache/helix/TestHelper.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/TestHelper.java b/helix-core/src/test/java/org/apache/helix/TestHelper.java
index 2f25e30..edc0646 100644
--- a/helix-core/src/test/java/org/apache/helix/TestHelper.java
+++ b/helix-core/src/test/java/org/apache/helix/TestHelper.java
@@ -37,6 +37,7 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+
 import org.I0Itec.zkclient.IDefaultNameSpace;
 import org.I0Itec.zkclient.IZkChildListener;
 import org.I0Itec.zkclient.IZkDataListener;
@@ -50,7 +51,8 @@ import org.apache.helix.manager.zk.ZKHelixAdmin;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZNRecordSerializer;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.manager.zk.client.HelixZkClient;
+import org.apache.helix.manager.zk.client.SharedZkClientFactory;
 import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState.RebalanceMode;
@@ -144,7 +146,7 @@ public class TestHelper {
     }
   }
 
-  public static void setupEmptyCluster(ZkClient zkClient, String clusterName) {
+  public static void setupEmptyCluster(HelixZkClient zkClient, String clusterName) {
     ZKHelixAdmin admin = new ZKHelixAdmin(zkClient);
     admin.addCluster(clusterName, true);
   }
@@ -211,7 +213,8 @@ public class TestHelper {
 
   public static boolean verifyEmptyCurStateAndExtView(String clusterName, String resourceName,
       Set<String> instanceNames, String zkAddr) {
-    ZkClient zkClient = new ZkClient(zkAddr);
+    HelixZkClient zkClient = SharedZkClientFactory.getInstance()
+        .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddr));
     zkClient.setZkSerializer(new ZNRecordSerializer());
 
     try {
@@ -257,17 +260,18 @@ public class TestHelper {
         RebalanceMode.SEMI_AUTO, doRebalance);
   }
 
-  public static void setupCluster(String clusterName, String ZkAddr, int startPort,
+  public static void setupCluster(String clusterName, String zkAddr, int startPort,
       String participantNamePrefix, String resourceNamePrefix, int resourceNb, int partitionNb,
       int nodesNb, int replica, String stateModelDef, RebalanceMode mode, boolean doRebalance)
       throws Exception {
-    ZkClient zkClient = new ZkClient(ZkAddr);
+    HelixZkClient zkClient = SharedZkClientFactory.getInstance()
+        .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddr));
     if (zkClient.exists("/" + clusterName)) {
       LOG.warn("Cluster already exists:" + clusterName + ". Deleting it");
       zkClient.deleteRecursively("/" + clusterName);
     }
 
-    ClusterSetup setupTool = new ClusterSetup(ZkAddr);
+    ClusterSetup setupTool = new ClusterSetup(zkAddr);
     setupTool.addCluster(clusterName, true);
 
     for (int i = 0; i < nodesNb; i++) {
@@ -286,12 +290,12 @@ public class TestHelper {
     zkClient.close();
   }
 
-  public static void dropCluster(String clusterName, ZkClient zkClient) throws Exception {
+  public static void dropCluster(String clusterName, HelixZkClient zkClient) throws Exception {
     ClusterSetup setupTool = new ClusterSetup(zkClient);
     dropCluster(clusterName, zkClient, setupTool);
   }
 
-  public static void dropCluster(String clusterName, ZkClient zkClient, ClusterSetup setup) {
+  public static void dropCluster(String clusterName, HelixZkClient zkClient, ClusterSetup setup) {
     String namespace = "/" + clusterName;
     if (zkClient.exists(namespace)) {
       try {
@@ -310,7 +314,8 @@ public class TestHelper {
    */
   public static void verifyState(String clusterName, String zkAddr,
       Map<String, Set<String>> stateMap, String state) {
-    ZkClient zkClient = new ZkClient(zkAddr);
+    HelixZkClient zkClient = SharedZkClientFactory.getInstance()
+        .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddr));
     zkClient.setZkSerializer(new ZNRecordSerializer());
 
     try {
@@ -513,7 +518,7 @@ public class TestHelper {
     System.out.println("END:Print cache");
   }
 
-  public static void readZkRecursive(String path, Map<String, ZNode> map, ZkClient zkclient) {
+  public static void readZkRecursive(String path, Map<String, ZNode> map, HelixZkClient zkclient) {
     try {
       Stat stat = new Stat();
       ZNRecord record = zkclient.readData(path, stat);
@@ -554,7 +559,7 @@ public class TestHelper {
   }
 
   public static boolean verifyZkCache(List<String> paths, BaseDataAccessor<ZNRecord> zkAccessor,
-      ZkClient zkclient, boolean needVerifyStat) {
+      HelixZkClient zkclient, boolean needVerifyStat) {
     // read everything
     Map<String, ZNode> zkMap = new HashMap<String, ZNode>();
     Map<String, ZNode> cache = new HashMap<String, ZNode>();
@@ -568,12 +573,12 @@ public class TestHelper {
   }
 
   public static boolean verifyZkCache(List<String> paths, Map<String, ZNode> cache,
-      ZkClient zkclient, boolean needVerifyStat) {
+      HelixZkClient zkclient, boolean needVerifyStat) {
     return verifyZkCache(paths, null, cache, zkclient, needVerifyStat);
   }
 
   public static boolean verifyZkCache(List<String> paths, List<String> pathsExcludeForStat,
-      Map<String, ZNode> cache, ZkClient zkclient, boolean needVerifyStat) {
+      Map<String, ZNode> cache, HelixZkClient zkclient, boolean needVerifyStat) {
     // read everything on zk under paths
     Map<String, ZNode> zkMap = new HashMap<String, ZNode>();
     for (String path : paths) {
@@ -799,7 +804,7 @@ public class TestHelper {
     return sb.toString();
   }
 
-  public static void printZkListeners(ZkClient client) throws Exception {
+  public static void printZkListeners(HelixZkClient client) throws Exception {
     Map<String, Set<IZkDataListener>> datalisteners = ZkTestHelper.getZkDataListener(client);
     Map<String, Set<IZkChildListener>> childListeners = ZkTestHelper.getZkChildListener(client);
 

http://git-wip-us.apache.org/repos/asf/helix/blob/9d7364d7/helix-core/src/test/java/org/apache/helix/TestHierarchicalDataStore.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/TestHierarchicalDataStore.java b/helix-core/src/test/java/org/apache/helix/TestHierarchicalDataStore.java
index 07aa4f4..bd123be 100644
--- a/helix-core/src/test/java/org/apache/helix/TestHierarchicalDataStore.java
+++ b/helix-core/src/test/java/org/apache/helix/TestHierarchicalDataStore.java
@@ -20,27 +20,29 @@ package org.apache.helix;
  */
 
 import java.io.FileFilter;
+
 import org.apache.helix.controller.HierarchicalDataHolder;
-import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.manager.zk.client.HelixZkClient;
+import org.apache.helix.manager.zk.client.SharedZkClientFactory;
 import org.testng.AssertJUnit;
 import org.testng.annotations.Test;
 
 public class TestHierarchicalDataStore extends ZkUnitTestBase {
-  protected static ZkClient _zkClientString = null;
+  protected static HelixZkClient _zkClient = null;
 
-  @Test(groups = {
-    "unitTest"
+  @Test(groups = { "unitTest"
   })
+
   public void testHierarchicalDataStore() {
-    _zkClientString = new ZkClient(ZK_ADDR, 1000, 3000);
+    _zkClient = SharedZkClientFactory.getInstance()
+        .buildZkClient(new HelixZkClient.ZkConnectionConfig(ZK_ADDR));
 
     String path = "/tmp/testHierarchicalDataStore";
     FileFilter filter = null;
-    // _zkClient.setZkSerializer(new ZNRecordSerializer());
 
-    _zkClientString.deleteRecursively(path);
+    _zkClient.deleteRecursively(path);
     HierarchicalDataHolder<ZNRecord> dataHolder =
-        new HierarchicalDataHolder<ZNRecord>(_zkClientString, path, filter);
+        new HierarchicalDataHolder<ZNRecord>(_zkClient, path, filter);
     dataHolder.print();
     AssertJUnit.assertFalse(dataHolder.refreshData());
 
@@ -69,12 +71,12 @@ public class TestHierarchicalDataStore extends ZkUnitTestBase {
   }
 
   private void set(String path, String data) {
-    _zkClientString.writeData(path, data);
+    _zkClient.writeData(path, data);
   }
 
   private void add(String path, String data) {
-    _zkClientString.createPersistent(path, true);
-    _zkClientString.writeData(path, data);
+    _zkClient.createPersistent(path, true);
+    _zkClient.writeData(path, data);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/9d7364d7/helix-core/src/test/java/org/apache/helix/TestZKCallback.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/TestZKCallback.java b/helix-core/src/test/java/org/apache/helix/TestZKCallback.java
index ee0a7c7..b80e4d6 100644
--- a/helix-core/src/test/java/org/apache/helix/TestZKCallback.java
+++ b/helix-core/src/test/java/org/apache/helix/TestZKCallback.java
@@ -29,8 +29,6 @@ import org.apache.helix.api.listeners.ExternalViewChangeListener;
 import org.apache.helix.api.listeners.IdealStateChangeListener;
 import org.apache.helix.api.listeners.LiveInstanceChangeListener;
 import org.apache.helix.api.listeners.MessageListener;
-import org.apache.helix.manager.zk.ZNRecordSerializer;
-import org.apache.helix.manager.zk.ZkClient;
 import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;

http://git-wip-us.apache.org/repos/asf/helix/blob/9d7364d7/helix-core/src/test/java/org/apache/helix/TestZkBasis.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/TestZkBasis.java b/helix-core/src/test/java/org/apache/helix/TestZkBasis.java
index 8b25214..5ef27a5 100644
--- a/helix-core/src/test/java/org/apache/helix/TestZkBasis.java
+++ b/helix-core/src/test/java/org/apache/helix/TestZkBasis.java
@@ -20,6 +20,7 @@ package org.apache.helix;
  */
 
 import java.util.Collections;
+import java.util.Date;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -29,6 +30,7 @@ import org.I0Itec.zkclient.IZkChildListener;
 import org.I0Itec.zkclient.IZkDataListener;
 import org.apache.helix.manager.zk.ZNRecordSerializer;
 import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.manager.zk.client.HelixZkClient;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
@@ -64,6 +66,83 @@ public class TestZkBasis extends ZkUnitTestBase {
     }
   }
 
+
+  @Test
+  public void testZkSessionExpiry() throws Exception {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+    System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+    ZkClient client =
+        new ZkClient(ZK_ADDR, HelixZkClient.DEFAULT_SESSION_TIMEOUT,
+            HelixZkClient.DEFAULT_CONNECTION_TIMEOUT, new ZNRecordSerializer());
+
+    String path = String.format("/%s", clusterName);
+    client.createEphemeral(path);
+    String oldSessionId = ZkTestHelper.getSessionId(client);
+    ZkTestHelper.expireSession(client);
+    String newSessionId = ZkTestHelper.getSessionId(client);
+    Assert.assertNotSame(newSessionId, oldSessionId);
+    Assert.assertFalse(client.exists(path), "Ephemeral znode should be gone after session expiry");
+    client.close();
+    System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+  }
+
+  @Test
+  public void testCloseZkClient() {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+    System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+    ZkClient client =
+        new ZkClient(ZK_ADDR, HelixZkClient.DEFAULT_SESSION_TIMEOUT,
+            HelixZkClient.DEFAULT_CONNECTION_TIMEOUT, new ZNRecordSerializer());
+    String path = String.format("/%s", clusterName);
+    client.createEphemeral(path);
+
+    client.close();
+    Assert.assertFalse(_gZkClient.exists(path), "Ephemeral node: " + path
+        + " should be removed after ZkClient#close()");
+    System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+  }
+
+  @Test
+  public void testCloseZkClientInZkClientEventThread() throws Exception {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+    System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+    final CountDownLatch waitCallback = new CountDownLatch(1);
+    final ZkClient client =
+        new ZkClient(ZK_ADDR, HelixZkClient.DEFAULT_SESSION_TIMEOUT,
+            HelixZkClient.DEFAULT_CONNECTION_TIMEOUT, new ZNRecordSerializer());
+    String path = String.format("/%s", clusterName);
+    client.createEphemeral(path);
+    client.subscribeDataChanges(path, new IZkDataListener() {
+
+      @Override
+      public void handleDataDeleted(String dataPath) throws Exception {
+      }
+
+      @Override
+      public void handleDataChange(String dataPath, Object data) throws Exception {
+        client.close();
+        waitCallback.countDown();
+      }
+    });
+
+    client.writeData(path, new ZNRecord("test"));
+    waitCallback.await();
+    Assert.assertFalse(_gZkClient.exists(path), "Ephemeral node: " + path
+        + " should be removed after ZkClient#close() in its own event-thread");
+
+    System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+  }
+
   /**
    * test zk watchers are renewed automatically after session expiry
    * zookeeper-client side keeps all registered watchers see ZooKeeper.WatchRegistration.register()
@@ -76,14 +155,13 @@ public class TestZkBasis extends ZkUnitTestBase {
    */
   @Test
   public void testWatchRenew() throws Exception {
-
     String className = TestHelper.getTestClassName();
     String methodName = TestHelper.getTestMethodName();
     String testName = className + "_" + methodName;
 
     final ZkClient client =
-        new ZkClient(ZK_ADDR, ZkClient.DEFAULT_SESSION_TIMEOUT,
-            ZkClient.DEFAULT_CONNECTION_TIMEOUT, new ZNRecordSerializer());
+        new ZkClient(ZK_ADDR, HelixZkClient.DEFAULT_SESSION_TIMEOUT,
+            HelixZkClient.DEFAULT_CONNECTION_TIMEOUT, new ZNRecordSerializer());
     // make sure "/testName/test" doesn't exist
     final String path = "/" + testName + "/test";
     client.delete(path);
@@ -127,8 +205,8 @@ public class TestZkBasis extends ZkUnitTestBase {
     String testName = className + "_" + methodName;
 
     final ZkClient client =
-        new ZkClient(ZK_ADDR, ZkClient.DEFAULT_SESSION_TIMEOUT,
-            ZkClient.DEFAULT_CONNECTION_TIMEOUT, new ZNRecordSerializer());
+        new ZkClient(ZK_ADDR, HelixZkClient.DEFAULT_SESSION_TIMEOUT,
+            HelixZkClient.DEFAULT_CONNECTION_TIMEOUT, new ZNRecordSerializer());
     // make sure "/testName/test" doesn't exist
     final String path = "/" + testName + "/test";
     client.createPersistent(path, true);

http://git-wip-us.apache.org/repos/asf/helix/blob/9d7364d7/helix-core/src/test/java/org/apache/helix/TestZnodeModify.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/TestZnodeModify.java b/helix-core/src/test/java/org/apache/helix/TestZnodeModify.java
index b898b8c..e5851f5 100644
--- a/helix-core/src/test/java/org/apache/helix/TestZnodeModify.java
+++ b/helix-core/src/test/java/org/apache/helix/TestZnodeModify.java
@@ -25,7 +25,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import org.apache.helix.manager.zk.ZNRecordSerializer;
-import org.apache.helix.manager.zk.ZkClient;
 import org.apache.helix.model.IdealState.IdealStateProperty;
 import org.apache.helix.model.IdealState.RebalanceMode;
 import org.apache.helix.tools.TestCommand;
@@ -210,10 +209,8 @@ public class TestZnodeModify extends ZkUnitTestBase {
       public void run() {
         try {
           Thread.sleep(3000);
-          final ZkClient zkClient = new ZkClient(ZK_ADDR);
-          zkClient.setZkSerializer(new ZNRecordSerializer());
-          zkClient.createPersistent(pathChild1, true);
-          zkClient.writeData(pathChild1, record);
+          _gZkClient.createPersistent(pathChild1, true);
+          _gZkClient.writeData(pathChild1, record);
         } catch (InterruptedException e) {
           logger.error("Interrupted sleep", e);
         }
@@ -228,28 +225,19 @@ public class TestZnodeModify extends ZkUnitTestBase {
 
   }
 
-  ZkClient _zkClient;
-
   @BeforeClass()
   public void beforeClass() {
     System.out.println("START " + getShortClassName() + " at "
         + new Date(System.currentTimeMillis()));
-
-    _zkClient = new ZkClient(ZK_ADDR);
-    _zkClient.setZkSerializer(new ZNRecordSerializer());
-    if (_zkClient.exists(PREFIX)) {
-      _zkClient.deleteRecursively(PREFIX);
+    if (_gZkClient.exists(PREFIX)) {
+      _gZkClient.deleteRecursively(PREFIX);
     }
-
   }
 
   @AfterClass
   public void afterClass() {
-    _zkClient.close();
-
     System.out
         .println("END " + getShortClassName() + " at " + new Date(System.currentTimeMillis()));
-
   }
 
   private ZNRecord getExampleZNRecord() {
@@ -267,5 +255,4 @@ public class TestZnodeModify extends ZkUnitTestBase {
     record.setListField("TestDB_0", list);
     return record;
   }
-
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/9d7364d7/helix-core/src/test/java/org/apache/helix/ZkTestHelper.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/ZkTestHelper.java b/helix-core/src/test/java/org/apache/helix/ZkTestHelper.java
index 32d1085..19cd2e8 100644
--- a/helix-core/src/test/java/org/apache/helix/ZkTestHelper.java
+++ b/helix-core/src/test/java/org/apache/helix/ZkTestHelper.java
@@ -32,6 +32,10 @@ import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import org.I0Itec.zkclient.IZkChildListener;
 import org.I0Itec.zkclient.IZkDataListener;
@@ -41,6 +45,7 @@ import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
 import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.manager.zk.client.HelixZkClient;
 import org.apache.helix.model.ExternalView;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
@@ -54,6 +59,7 @@ import org.testng.Assert;
 
 public class ZkTestHelper {
   private static Logger LOG = LoggerFactory.getLogger(ZkTestHelper.class);
+  private static ExecutorService _executor = Executors.newSingleThreadExecutor();
 
   static {
     // Logger.getRootLogger().setLevel(Level.DEBUG);
@@ -62,11 +68,12 @@ public class ZkTestHelper {
   /**
    * Simulate a zk state change by calling {@link ZkClient#process(WatchedEvent)} directly
    */
-  public static void simulateZkStateReconnected(ZkClient client) {
+  public static void simulateZkStateReconnected(HelixZkClient client) {
+    ZkClient zkClient = (ZkClient) client;
     WatchedEvent event = new WatchedEvent(EventType.None, KeeperState.Disconnected, null);
-    client.process(event);
+    zkClient.process(event);
     event = new WatchedEvent(EventType.None, KeeperState.SyncConnected, null);
-    client.process(event);
+    zkClient.process(event);
   }
 
   /**
@@ -74,19 +81,20 @@ public class ZkTestHelper {
    * @param client
    * @return
    */
-  public static String getSessionId(ZkClient client) {
-    ZkConnection connection = ((ZkConnection) client.getConnection());
+  public static String getSessionId(HelixZkClient client) {
+    ZkConnection connection = (ZkConnection) ((ZkClient) client).getConnection();
     ZooKeeper curZookeeper = connection.getZookeeper();
     return Long.toHexString(curZookeeper.getSessionId());
   }
 
   /**
    * Expire current zk session and wait for {@link IZkStateListener#handleNewSession()} invoked
-   * @param zkClient
+   * @param client
    * @throws Exception
    */
 
-  public static void disconnectSession(final ZkClient zkClient) throws Exception {
+  public static void disconnectSession(HelixZkClient client) throws Exception {
+    final ZkClient zkClient = (ZkClient) client;
     IZkStateListener listener = new IZkStateListener() {
       @Override
       public void handleStateChanged(KeeperState state) throws Exception {
@@ -96,7 +104,7 @@ public class ZkTestHelper {
       @Override
       public void handleNewSession() throws Exception {
         // make sure zkclient is connected again
-        zkClient.waitUntilConnected(ZkClient.DEFAULT_CONNECTION_TIMEOUT, TimeUnit.SECONDS);
+        zkClient.waitUntilConnected(HelixZkClient.DEFAULT_CONNECTION_TIMEOUT, TimeUnit.SECONDS);
 
         ZkConnection connection = ((ZkConnection) zkClient.getConnection());
         ZooKeeper curZookeeper = connection.getZookeeper();
@@ -138,8 +146,9 @@ public class ZkTestHelper {
     LOG.info("After expiry. sessionId: " + Long.toHexString(curZookeeper.getSessionId()));
   }
 
-  public static void expireSession(final ZkClient zkClient) throws Exception {
+  public static void expireSession(HelixZkClient client) throws Exception {
     final CountDownLatch waitNewSession = new CountDownLatch(1);
+    final ZkClient zkClient = (ZkClient) client;
 
     IZkStateListener listener = new IZkStateListener() {
       @Override
@@ -150,7 +159,7 @@ public class ZkTestHelper {
       @Override
       public void handleNewSession() throws Exception {
         // make sure zkclient is connected again
-        zkClient.waitUntilConnected(ZkClient.DEFAULT_CONNECTION_TIMEOUT, TimeUnit.SECONDS);
+        zkClient.waitUntilConnected(HelixZkClient.DEFAULT_CONNECTION_TIMEOUT, TimeUnit.SECONDS);
 
         ZkConnection connection = ((ZkConnection) zkClient.getConnection());
         ZooKeeper curZookeeper = connection.getZookeeper();
@@ -204,10 +213,11 @@ public class ZkTestHelper {
 
   /**
    * expire zk session asynchronously
-   * @param zkClient
+   * @param client
    * @throws Exception
    */
-  public static void asyncExpireSession(final ZkClient zkClient) throws Exception {
+  public static void asyncExpireSession(HelixZkClient client) throws Exception {
+    final ZkClient zkClient = (ZkClient) client;
     ZkConnection connection = ((ZkConnection) zkClient.getConnection());
     ZooKeeper curZookeeper = connection.getZookeeper();
     LOG.info("Before expiry. sessionId: " + Long.toHexString(curZookeeper.getSessionId()));
@@ -238,7 +248,7 @@ public class ZkTestHelper {
   /*
    * stateMap: partition->instance->state
    */
-  public static boolean verifyState(ZkClient zkclient, String clusterName, String resourceName,
+  public static boolean verifyState(HelixZkClient zkclient, String clusterName, String resourceName,
       Map<String, Map<String, String>> expectStateMap, String op) {
     boolean result = true;
     ZkBaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(zkclient);
@@ -382,9 +392,11 @@ public class ZkTestHelper {
     }
   }
 
-  public static Map<String, List<String>> getZkWatch(ZkClient client) throws Exception {
+  public static Map<String, List<String>> getZkWatch(HelixZkClient client) throws Exception {
     Map<String, List<String>> lists = new HashMap<String, List<String>>();
-    ZkConnection connection = ((ZkConnection) client.getConnection());
+    ZkClient zkClient = (ZkClient) client;
+
+    ZkConnection connection = ((ZkConnection) zkClient.getConnection());
     ZooKeeper zk = connection.getZookeeper();
 
     java.lang.reflect.Field field = getField(zk.getClass(), "watchManager");
@@ -406,14 +418,14 @@ public class ZkTestHelper {
     HashMap<String, Set<Watcher>> childWatches =
         (HashMap<String, Set<Watcher>>) field2.get(watchManager);
 
-    lists.put("dataWatches", new ArrayList<String>(dataWatches.keySet()));
-    lists.put("existWatches", new ArrayList<String>(existWatches.keySet()));
-    lists.put("childWatches", new ArrayList<String>(childWatches.keySet()));
+    lists.put("dataWatches", new ArrayList<>(dataWatches.keySet()));
+    lists.put("existWatches", new ArrayList<>(existWatches.keySet()));
+    lists.put("childWatches", new ArrayList<>(childWatches.keySet()));
 
     return lists;
   }
 
-  public static Map<String, Set<IZkDataListener>> getZkDataListener(ZkClient client)
+  public static Map<String, Set<IZkDataListener>> getZkDataListener(HelixZkClient client)
       throws Exception {
     java.lang.reflect.Field field = getField(client.getClass(), "_dataListener");
     field.setAccessible(true);
@@ -422,7 +434,7 @@ public class ZkTestHelper {
     return dataListener;
   }
 
-  public static Map<String, Set<IZkChildListener>> getZkChildListener(ZkClient client)
+  public static Map<String, Set<IZkChildListener>> getZkChildListener(HelixZkClient client)
       throws Exception {
     java.lang.reflect.Field field = getField(client.getClass(), "_childListener");
     field.setAccessible(true);
@@ -431,7 +443,7 @@ public class ZkTestHelper {
     return childListener;
   }
 
-  public static boolean tryWaitZkEventsCleaned(ZkClient zkclient) throws Exception {
+  public static boolean tryWaitZkEventsCleaned(HelixZkClient zkclient) throws Exception {
     java.lang.reflect.Field field = getField(zkclient.getClass(), "_eventThread");
     field.setAccessible(true);
     Object eventThread = field.get(zkclient);
@@ -456,4 +468,18 @@ public class ZkTestHelper {
     }
     return false;
   }
+
+  public static void injectExpire(HelixZkClient client)
+      throws ExecutionException, InterruptedException {
+    final ZkClient zkClient = (ZkClient) client;
+    Future future = _executor.submit(new Runnable() {
+      @Override
+      public void run() {
+        WatchedEvent event =
+            new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.Expired, null);
+        zkClient.process(event);
+      }
+    });
+    future.get();
+  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/9d7364d7/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java b/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
index c69744e..b0c44e1 100644
--- a/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
+++ b/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
@@ -27,6 +27,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.logging.Level;
+
 import org.I0Itec.zkclient.IZkStateListener;
 import org.I0Itec.zkclient.ZkConnection;
 import org.I0Itec.zkclient.ZkServer;
@@ -56,6 +57,8 @@ import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZNRecordSerializer;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
 import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.manager.zk.client.DedicatedZkClientFactory;
+import org.apache.helix.manager.zk.client.HelixZkClient;
 import org.apache.helix.model.BuiltInStateModelDefinitions;
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.ConfigScope;
@@ -72,7 +75,6 @@ import org.apache.helix.model.builder.ConfigScopeBuilder;
 import org.apache.helix.tools.ClusterSetup;
 import org.apache.helix.tools.ClusterStateVerifier;
 import org.apache.helix.tools.StateModelConfigGenerator;
-import org.apache.helix.util.ZKClientPool;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.ZooKeeper;
@@ -90,7 +92,7 @@ public class ZkTestBase {
   private static Logger LOG = LoggerFactory.getLogger(ZkTestBase.class);
 
   protected static ZkServer _zkServer;
-  protected static ZkClient _gZkClient;
+  protected static HelixZkClient _gZkClient;
   protected static ClusterSetup _gSetupTool;
   protected static BaseDataAccessor<ZNRecord> _baseAccessor;
 
@@ -113,17 +115,17 @@ public class ZkTestBase {
 
     _zkServer = TestHelper.startZkServer(ZK_ADDR);
     AssertJUnit.assertTrue(_zkServer != null);
-    ZKClientPool.reset();
 
-    _gZkClient = new ZkClient(ZK_ADDR);
-    _gZkClient.setZkSerializer(new ZNRecordSerializer());
+    HelixZkClient.ZkClientConfig clientConfig = new HelixZkClient.ZkClientConfig();
+    clientConfig.setZkSerializer(new ZNRecordSerializer());
+    _gZkClient = DedicatedZkClientFactory.getInstance()
+        .buildZkClient(new HelixZkClient.ZkConnectionConfig(ZK_ADDR), clientConfig);
     _gSetupTool = new ClusterSetup(_gZkClient);
     _baseAccessor = new ZkBaseDataAccessor<>(_gZkClient);
   }
 
   @AfterSuite
   public void afterSuite() {
-    ZKClientPool.reset();
     _gZkClient.close();
     TestHelper.stopZkServer(_zkServer);
   }
@@ -149,7 +151,7 @@ public class ZkTestBase {
     return this.getClass().getSimpleName();
   }
 
-  protected String getCurrentLeader(ZkClient zkClient, String clusterName) {
+  protected String getCurrentLeader(HelixZkClient zkClient, String clusterName) {
     ZKHelixDataAccessor accessor =
         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(zkClient));
     Builder keyBuilder = accessor.keyBuilder();
@@ -161,7 +163,7 @@ public class ZkTestBase {
     return leader.getInstanceName();
   }
 
-  protected void stopCurrentLeader(ZkClient zkClient, String clusterName,
+  protected void stopCurrentLeader(HelixZkClient zkClient, String clusterName,
       Map<String, Thread> threadMap, Map<String, HelixManager> managerMap) {
     String leader = getCurrentLeader(zkClient, clusterName);
     Assert.assertTrue(leader != null);
@@ -202,7 +204,7 @@ public class ZkTestBase {
     new ConfigAccessor(_gZkClient).set(scope, "healthChange" + ".enabled", "" + true);
   }
 
-  protected void enablePersistBestPossibleAssignment(ZkClient zkClient, String clusterName,
+  protected void enablePersistBestPossibleAssignment(HelixZkClient zkClient, String clusterName,
       Boolean enabled) {
     ConfigAccessor configAccessor = new ConfigAccessor(zkClient);
     ClusterConfig clusterConfig = configAccessor.getClusterConfig(clusterName);
@@ -210,7 +212,7 @@ public class ZkTestBase {
     configAccessor.setClusterConfig(clusterName, clusterConfig);
   }
 
-  protected void enablePersistIntermediateAssignment(ZkClient zkClient, String clusterName,
+  protected void enablePersistIntermediateAssignment(HelixZkClient zkClient, String clusterName,
       Boolean enabled) {
     ConfigAccessor configAccessor = new ConfigAccessor(zkClient);
     ClusterConfig clusterConfig = configAccessor.getClusterConfig(clusterName);
@@ -218,7 +220,7 @@ public class ZkTestBase {
     configAccessor.setClusterConfig(clusterName, clusterConfig);
   }
 
-  protected void enableTopologyAwareRebalance(ZkClient zkClient, String clusterName,
+  protected void enableTopologyAwareRebalance(HelixZkClient zkClient, String clusterName,
       Boolean enabled) {
     ConfigAccessor configAccessor = new ConfigAccessor(zkClient);
     ClusterConfig clusterConfig = configAccessor.getClusterConfig(clusterName);
@@ -226,7 +228,7 @@ public class ZkTestBase {
     configAccessor.setClusterConfig(clusterName, clusterConfig);
   }
 
-  protected void enableDelayRebalanceInCluster(ZkClient zkClient, String clusterName,
+  protected void enableDelayRebalanceInCluster(HelixZkClient zkClient, String clusterName,
       boolean enabled) {
     ConfigAccessor configAccessor = new ConfigAccessor(zkClient);
     ClusterConfig clusterConfig = configAccessor.getClusterConfig(clusterName);
@@ -234,7 +236,7 @@ public class ZkTestBase {
     configAccessor.setClusterConfig(clusterName, clusterConfig);
   }
 
-  protected void enableDelayRebalanceInInstance(ZkClient zkClient, String clusterName,
+  protected void enableDelayRebalanceInInstance(HelixZkClient zkClient, String clusterName,
       String instanceName, boolean enabled) {
     ConfigAccessor configAccessor = new ConfigAccessor(zkClient);
     InstanceConfig instanceConfig = configAccessor.getInstanceConfig(clusterName, instanceName);
@@ -274,7 +276,7 @@ public class ZkTestBase {
     }
   }
 
-  protected void setDelayTimeInCluster(ZkClient zkClient, String clusterName, long delay) {
+  protected void setDelayTimeInCluster(HelixZkClient zkClient, String clusterName, long delay) {
     ConfigAccessor configAccessor = new ConfigAccessor(zkClient);
     ClusterConfig clusterConfig = configAccessor.getClusterConfig(clusterName);
     clusterConfig.setRebalanceDelayTime(delay);
@@ -412,7 +414,7 @@ public class ZkTestBase {
     stage.postProcess();
   }
 
-  public void verifyInstance(ZkClient zkClient, String clusterName, String instance,
+  public void verifyInstance(HelixZkClient zkClient, String clusterName, String instance,
       boolean wantExists) {
     // String instanceConfigsPath = HelixUtil.getConfigPath(clusterName);
     String instanceConfigsPath = PropertyPathBuilder.instanceConfig(clusterName);
@@ -422,13 +424,13 @@ public class ZkTestBase {
     AssertJUnit.assertEquals(wantExists, zkClient.exists(instancePath));
   }
 
-  public void verifyResource(ZkClient zkClient, String clusterName, String resource,
+  public void verifyResource(HelixZkClient zkClient, String clusterName, String resource,
       boolean wantExists) {
     String resourcePath = PropertyPathBuilder.idealState(clusterName, resource);
     AssertJUnit.assertEquals(wantExists, zkClient.exists(resourcePath));
   }
 
-  public void verifyEnabled(ZkClient zkClient, String clusterName, String instance,
+  public void verifyEnabled(HelixZkClient zkClient, String clusterName, String instance,
       boolean wantEnabled) {
     ZKHelixDataAccessor accessor =
         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(zkClient));
@@ -438,7 +440,7 @@ public class ZkTestBase {
     AssertJUnit.assertEquals(wantEnabled, config.getInstanceEnabled());
   }
 
-  public void verifyReplication(ZkClient zkClient, String clusterName, String resource, int repl) {
+  public void verifyReplication(HelixZkClient zkClient, String clusterName, String resource, int repl) {
     ZKHelixDataAccessor accessor =
         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(zkClient));
     Builder keyBuilder = accessor.keyBuilder();
@@ -476,8 +478,10 @@ public class ZkTestBase {
     LOG.info("After session expiry sessionId = " + oldZookeeper.getSessionId());
   }
 
-  protected void simulateSessionExpiry(ZkClient zkClient)
+  protected void simulateSessionExpiry(HelixZkClient client)
       throws IOException, InterruptedException, IOException {
+    ZkClient zkClient = (ZkClient) client;
+
     IZkStateListener listener = new IZkStateListener() {
       @Override
       public void handleStateChanged(Watcher.Event.KeeperState state) throws Exception {
@@ -667,17 +671,21 @@ public class ZkTestBase {
   protected static class EmptyZkVerifier implements ClusterStateVerifier.ZkVerifier {
     private final String _clusterName;
     private final String _resourceName;
-    private final ZkClient _zkClient;
+    private final HelixZkClient _zkClient;
 
     /**
      * Instantiate the verifier
-     * @param clusterName the cluster to verify
+     *
+     * @param clusterName  the cluster to verify
      * @param resourceName the resource to verify
      */
     public EmptyZkVerifier(String clusterName, String resourceName) {
       _clusterName = clusterName;
       _resourceName = resourceName;
-      _zkClient = ZKClientPool.getZkClient(ZK_ADDR);
+
+      _zkClient = DedicatedZkClientFactory.getInstance()
+          .buildZkClient(new HelixZkClient.ZkConnectionConfig(ZK_ADDR));
+      _zkClient.setZkSerializer(new ZNRecordSerializer());
     }
 
     @Override
@@ -717,7 +725,7 @@ public class ZkTestBase {
 
     @Override
     public ZkClient getZkClient() {
-      return _zkClient;
+      return (ZkClient) _gZkClient;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/helix/blob/9d7364d7/helix-core/src/test/java/org/apache/helix/integration/TestCorrectnessOnConnectivityLoss.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestCorrectnessOnConnectivityLoss.java b/helix-core/src/test/java/org/apache/helix/integration/TestCorrectnessOnConnectivityLoss.java
index 3554207..a78e7bf 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestCorrectnessOnConnectivityLoss.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestCorrectnessOnConnectivityLoss.java
@@ -20,6 +20,8 @@ package org.apache.helix.integration;
  */
 
 import com.google.common.collect.Maps;
+
+import java.lang.reflect.Method;
 import java.util.Map;
 import org.I0Itec.zkclient.ZkServer;
 import org.apache.helix.HelixManager;
@@ -50,11 +52,11 @@ public class TestCorrectnessOnConnectivityLoss {
   private ClusterControllerManager _controller;
 
   @BeforeMethod
-  public void beforeMethod() throws Exception {
-    _zkServer = TestHelper.startZkServer(ZK_ADDR);
+  public void beforeMethod(Method testMethod) throws Exception {
+    _zkServer = TestHelper.startZkServer(ZK_ADDR, null, false);
 
     String className = TestHelper.getTestClassName();
-    String methodName = TestHelper.getTestMethodName();
+    String methodName = testMethod.getName();
     _clusterName = className + "_" + methodName;
     TestHelper.setupCluster(_clusterName, ZK_ADDR, 12918, // participant start port
         "localhost", // participant host
@@ -71,6 +73,11 @@ public class TestCorrectnessOnConnectivityLoss {
     _controller.connect();
   }
 
+  @AfterMethod
+  public void afterMethod() {
+    TestHelper.stopZkServer(_zkServer);
+  }
+
   @Test
   public void testParticipant() throws Exception {
     Map<String, Integer> stateReachedCounts = Maps.newHashMap();
@@ -136,11 +143,6 @@ public class TestCorrectnessOnConnectivityLoss {
     }
   }
 
-  @AfterMethod
-  public void afterMethod() throws Exception {
-    TestHelper.stopZkServer(_zkServer);
-  }
-
   @StateModelInfo(initialState = "OFFLINE", states = {
       "MASTER", "SLAVE", "OFFLINE", "ERROR"
   })