You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2018/10/30 00:40:33 UTC

[1/2] helix git commit: Introduce Helix ZkClient factory. And use the factory to generate new ZkClient in the critical Helix components.

Repository: helix
Updated Branches:
  refs/heads/master 281f5d1ec -> 7bb55742e


http://git-wip-us.apache.org/repos/asf/helix/blob/7bb55742/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java b/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
index b57ca87..9c18d09 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
@@ -57,6 +57,7 @@ import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.Watcher.Event.EventType;
 import org.apache.zookeeper.Watcher.Event.KeeperState;
 import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
@@ -70,8 +71,8 @@ public class ZkClient implements Watcher {
   private static Logger LOG = LoggerFactory.getLogger(ZkClient.class);
   private static long MAX_RECONNECT_INTERVAL_MS = 30000; // 30 seconds
 
-  protected final IZkConnection _connection;
-  protected final long _operationRetryTimeoutInMillis;
+  private final IZkConnection _connection;
+  private final long _operationRetryTimeoutInMillis;
   private final Map<String, Set<IZkChildListener>> _childListener =
       new ConcurrentHashMap<>();
   private final ConcurrentHashMap<String, Set<IZkDataListenerEntry>> _dataListener =
@@ -87,7 +88,6 @@ public class ZkClient implements Watcher {
   private PathBasedZkSerializer _pathBasedZkSerializer;
   private ZkClientMonitor _monitor;
 
-
   private class IZkDataListenerEntry {
     final IZkDataListener _dataListener;
     final boolean _prefetchData;
@@ -130,7 +130,6 @@ public class ZkClient implements Watcher {
     }
   }
 
-
   protected ZkClient(IZkConnection zkConnection, int connectionTimeout, long operationRetryTimeout,
       PathBasedZkSerializer zkSerializer, String monitorType, String monitorKey,
       String monitorInstanceName, boolean monitorRootPathOnly) {
@@ -140,6 +139,7 @@ public class ZkClient implements Watcher {
     _connection = zkConnection;
     _pathBasedZkSerializer = zkSerializer;
     _operationRetryTimeoutInMillis = operationRetryTimeout;
+
     connect(connectionTimeout, this);
 
     // initiate monitor
@@ -510,7 +510,7 @@ public class ZkClient implements Watcher {
       String actualPath = retryUntilConnected(new Callable<String>() {
         @Override
         public String call() throws Exception {
-          return _connection.create(path, data, acl, mode);
+          return getConnection().create(path, data, acl, mode);
         }
       });
       record(path, data, startT, ZkClientMonitor.AccessType.WRITE);
@@ -695,7 +695,7 @@ public class ZkClient implements Watcher {
       List<String> children = retryUntilConnected(new Callable<List<String>>() {
         @Override
         public List<String> call() throws Exception {
-          return _connection.getChildren(path, watch);
+          return getConnection().getChildren(path, watch);
         }
       });
       record(path, null, startT, ZkClientMonitor.AccessType.READ);
@@ -737,7 +737,7 @@ public class ZkClient implements Watcher {
       boolean exists = retryUntilConnected(new Callable<Boolean>() {
         @Override
         public Boolean call() throws Exception {
-          return _connection.exists(path, watch);
+          return getConnection().exists(path, watch);
         }
       });
       record(path, null, startT, ZkClientMonitor.AccessType.READ);
@@ -759,7 +759,7 @@ public class ZkClient implements Watcher {
       Stat stat = retryUntilConnected(new Callable<Stat>() {
         @Override
         public Stat call() throws Exception {
-          Stat stat = ((ZkConnection) _connection).getZookeeper().exists(path, false);
+          Stat stat = ((ZkConnection) getConnection()).getZookeeper().exists(path, false);
           return stat;
         }
       });
@@ -776,14 +776,14 @@ public class ZkClient implements Watcher {
     }
   }
 
-  private void processStateChanged(WatchedEvent event) {
+  protected void processStateChanged(WatchedEvent event) {
     LOG.info("zookeeper state changed (" + event.getState() + ")");
     setCurrentState(event.getState());
     if (getShutdownTrigger()) {
       return;
     }
     fireStateChangedEvent(event.getState());
-    if (event.getState() == KeeperState.Expired) {
+    if (isManagingZkConnection() && event.getState() == KeeperState.Expired) {
       reconnectOnExpiring();
     }
   }
@@ -794,7 +794,7 @@ public class ZkClient implements Watcher {
         new ExponentialBackoffStrategy(MAX_RECONNECT_INTERVAL_MS, true);
 
     Exception reconnectException = new ZkException("Shutdown triggered.");
-    while (!_closed) {
+    while (!isClosed()) {
       try {
         reconnect();
         fireNewSessionEvents();
@@ -820,6 +820,19 @@ public class ZkClient implements Watcher {
     fireSessionEstablishmentError(reconnectException);
   }
 
+  private void reconnect() {
+    getEventLock().lock();
+    try {
+      IZkConnection connection = getConnection();
+      connection.close();
+      connection.connect(this);
+    } catch (InterruptedException e) {
+      throw new ZkInterruptedException(e);
+    } finally {
+      getEventLock().unlock();
+    }
+  }
+
   private void fireNewSessionEvents() {
     for (final IZkStateListener stateListener : _stateListener) {
       _eventThread.send(new ZkEvent("New session event sent to " + stateListener) {
@@ -831,7 +844,7 @@ public class ZkClient implements Watcher {
     }
   }
 
-  private void fireStateChangedEvent(final KeeperState state) {
+  protected void fireStateChangedEvent(final KeeperState state) {
     for (final IZkStateListener stateListener : _stateListener) {
       _eventThread.send(new ZkEvent("State changed to " + state + " sent to " + stateListener) {
 
@@ -1073,7 +1086,7 @@ public class ZkClient implements Watcher {
     }
     final long operationStartTime = System.currentTimeMillis();
     while (true) {
-      if (_closed) {
+      if (isClosed()) {
         throw new IllegalStateException("ZkClient already closed!");
       }
       try {
@@ -1147,7 +1160,7 @@ public class ZkClient implements Watcher {
 
           @Override
           public Object call() throws Exception {
-            _connection.delete(path);
+            getConnection().delete(path);
             return null;
           }
         });
@@ -1226,7 +1239,7 @@ public class ZkClient implements Watcher {
       data = retryUntilConnected(new Callable<byte[]>() {
 
         @Override public byte[] call() throws Exception {
-          return _connection.readData(path, stat, watch);
+          return getConnection().readData(path, stat, watch);
         }
       });
       record(path, data, startT, ZkClientMonitor.AccessType.READ);
@@ -1299,7 +1312,7 @@ public class ZkClient implements Watcher {
       checkDataSizeLimit(data);
       final Stat stat = (Stat) retryUntilConnected(new Callable<Object>() {
         @Override public Object call() throws Exception {
-          return _connection.writeDataReturnStat(path, data, expectedVersion);
+          return getConnection().writeDataReturnStat(path, data, expectedVersion);
         }
       });
       record(path, data, startT, ZkClientMonitor.AccessType.WRITE);
@@ -1326,7 +1339,7 @@ public class ZkClient implements Watcher {
     final byte[] data = (datat == null ? null : serialize(datat, path));
     retryUntilConnected(new Callable<Object>() {
       @Override public Object call() throws Exception {
-        ((ZkConnection) _connection).getZookeeper().create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE,
+        ((ZkConnection) getConnection()).getZookeeper().create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE,
             // Arrays.asList(DEFAULT_ACL),
             mode, cb, new ZkAsyncCallbacks.ZkAsyncCallContext(_monitor, startT,
                 data == null ? 0 : data.length, false));
@@ -1342,7 +1355,7 @@ public class ZkClient implements Watcher {
     final byte[] data = serialize(datat, path);
     retryUntilConnected(new Callable<Object>() {
       @Override public Object call() throws Exception {
-        ((ZkConnection) _connection).getZookeeper().setData(path, data, version, cb,
+        ((ZkConnection) getConnection()).getZookeeper().setData(path, data, version, cb,
             new ZkAsyncCallbacks.ZkAsyncCallContext(_monitor, startT,
                 data == null ? 0 : data.length, false));
         return null;
@@ -1354,7 +1367,7 @@ public class ZkClient implements Watcher {
     final long startT = System.currentTimeMillis();
     retryUntilConnected(new Callable<Object>() {
       @Override public Object call() throws Exception {
-        ((ZkConnection) _connection).getZookeeper().getData(path, null, cb,
+        ((ZkConnection) getConnection()).getZookeeper().getData(path, null, cb,
             new ZkAsyncCallbacks.ZkAsyncCallContext(_monitor, startT, 0, true));
         return null;
       }
@@ -1365,7 +1378,7 @@ public class ZkClient implements Watcher {
     final long startT = System.currentTimeMillis();
     retryUntilConnected(new Callable<Object>() {
       @Override public Object call() throws Exception {
-        ((ZkConnection) _connection).getZookeeper().exists(path, null, cb,
+        ((ZkConnection) getConnection()).getZookeeper().exists(path, null, cb,
             new ZkAsyncCallbacks.ZkAsyncCallContext(_monitor, startT, 0, true));
         return null;
       }
@@ -1376,7 +1389,7 @@ public class ZkClient implements Watcher {
     final long startT = System.currentTimeMillis();
     retryUntilConnected(new Callable<Object>() {
       @Override public Object call() throws Exception {
-        ((ZkConnection) _connection).getZookeeper().delete(path, -1, cb,
+        ((ZkConnection) getConnection()).getZookeeper().delete(path, -1, cb,
             new ZkAsyncCallbacks.ZkAsyncCallContext(_monitor, startT, 0, false));
         return null;
       }
@@ -1394,7 +1407,7 @@ public class ZkClient implements Watcher {
   public void watchForData(final String path) {
     retryUntilConnected(new Callable<Object>() {
       @Override public Object call() throws Exception {
-        _connection.exists(path, true);
+        getConnection().exists(path, true);
         return null;
       }
     });
@@ -1433,7 +1446,7 @@ public class ZkClient implements Watcher {
   public void addAuthInfo(final String scheme, final byte[] auth) {
     retryUntilConnected(new Callable<Object>() {
       @Override public Object call() throws Exception {
-        _connection.addAuthInfo(scheme, auth);
+        getConnection().addAuthInfo(scheme, auth);
         return null;
       }
     });
@@ -1453,22 +1466,34 @@ public class ZkClient implements Watcher {
    */
   public void connect(final long maxMsToWaitUntilConnected, Watcher watcher)
       throws ZkInterruptedException, ZkTimeoutException, IllegalStateException {
-    if (_closed) {
+    if (isClosed()) {
       throw new IllegalStateException("ZkClient already closed!");
     }
     boolean started = false;
     acquireEventLock();
     try {
       setShutdownTrigger(false);
-      _eventThread = new ZkEventThread(_connection.getServers());
+
+      IZkConnection zkConnection = getConnection();
+      _eventThread = new ZkEventThread(zkConnection.getServers());
       _eventThread.start();
-      _connection.connect(watcher);
 
-      LOG.debug("Awaiting connection to Zookeeper server");
-      if (!waitUntilConnected(maxMsToWaitUntilConnected, TimeUnit.MILLISECONDS)) {
-        throw new ZkTimeoutException(
-            "Unable to connect to zookeeper server within timeout: " + maxMsToWaitUntilConnected);
+      if (isManagingZkConnection()) {
+        zkConnection.connect(watcher);
+        LOG.debug("Awaiting connection to Zookeeper server");
+        if (!waitUntilConnected(maxMsToWaitUntilConnected, TimeUnit.MILLISECONDS)) {
+          throw new ZkTimeoutException(
+              "Unable to connect to zookeeper server within timeout: " + maxMsToWaitUntilConnected);
+        }
+      } else {
+        // if the client is not managing connection, the input connection is supposed to connect.
+        if (isConnectionClosed()) {
+          throw new HelixException(
+              "Unable to connect to zookeeper server with the specified ZkConnection");
+        }
+        setCurrentState(KeeperState.SyncConnected);
       }
+
       started = true;
     } finally {
       getEventLock().unlock();
@@ -1484,7 +1509,7 @@ public class ZkClient implements Watcher {
   public long getCreationTime(String path) {
     acquireEventLock();
     try {
-      return _connection.getCreateTime(path);
+      return getConnection().getCreateTime(path);
     } catch (KeeperException e) {
       throw ZkException.create(e);
     } catch (InterruptedException e) {
@@ -1495,7 +1520,7 @@ public class ZkClient implements Watcher {
   }
 
   public String getServers() {
-    return _connection.getServers();
+    return getConnection().getServers();
   }
 
   /**
@@ -1509,15 +1534,18 @@ public class ZkClient implements Watcher {
       LOG.trace("closing a zkclient. callStack: " + Arrays.asList(calls));
     }
     getEventLock().lock();
+    IZkConnection connection = getConnection();
     try {
-      if (_connection == null || _closed) {
+      if (connection == null || _closed) {
         return;
       }
-      LOG.info("Closing zkclient: " + ((ZkConnection) _connection).getZookeeper());
       setShutdownTrigger(true);
       _eventThread.interrupt();
       _eventThread.join(2000);
-      _connection.close();
+      if (isManagingZkConnection()) {
+        LOG.info("Closing zkclient: " + ((ZkConnection) connection).getZookeeper());
+        connection.close();
+      }
       _closed = true;
 
       // send state change notification to unlock any wait
@@ -1529,7 +1557,7 @@ public class ZkClient implements Watcher {
        * Workaround for HELIX-264: calling ZkClient#close() in its own eventThread context will
        * throw ZkInterruptedException and skip ZkConnection#close()
        */
-      if (_connection != null) {
+      if (connection != null) {
         try {
           /**
            * ZkInterruptedException#construct() honors InterruptedException by calling
@@ -1537,7 +1565,9 @@ public class ZkClient implements Watcher {
            * zk-connection
            */
           Thread.interrupted();
-          _connection.close();
+          if (isManagingZkConnection()) {
+            connection.close();
+          }
           /**
            * restore interrupted status of current thread
            */
@@ -1556,26 +1586,20 @@ public class ZkClient implements Watcher {
   }
 
   public boolean isClosed() {
-    return _closed;
-  }
-
-  public boolean isConnectionClosed() {
-    return (_connection == null || _connection.getZookeeperState() == null ||
-        !_connection.getZookeeperState().isAlive());
-  }
-
-  private void reconnect() {
-    getEventLock().lock();
     try {
-      _connection.close();
-      _connection.connect(this);
-    } catch (InterruptedException e) {
-      throw new ZkInterruptedException(e);
+      getEventLock().lock();
+      return _closed;
     } finally {
       getEventLock().unlock();
     }
   }
 
+  public boolean isConnectionClosed() {
+    IZkConnection connection = getConnection();
+    return (connection == null || connection.getZookeeperState() == null ||
+        !connection.getZookeeperState().isAlive());
+  }
+
   public void setShutdownTrigger(boolean triggerState) {
     _shutdownTriggered = triggerState;
   }
@@ -1605,11 +1629,29 @@ public class ZkClient implements Watcher {
     return retryUntilConnected(new Callable<List<OpResult>>() {
 
       @Override public List<OpResult> call() throws Exception {
-        return _connection.multi(ops);
+        return getConnection().multi(ops);
       }
     });
   }
 
+  /**
+   * @return true if this ZkClient is managing the ZkConnection.
+   */
+  protected boolean isManagingZkConnection() {
+    return true;
+  }
+
+  public long getSessionId() {
+    ZkConnection zkConnection = ((ZkConnection) getConnection());
+    ZooKeeper zk = zkConnection.getZookeeper();
+    if (zk == null) {
+      throw new HelixException(
+          "ZooKeeper connection information is not available now. ZkClient might be disconnected.");
+    } else {
+      return zkConnection.getZookeeper().getSessionId();
+    }
+  }
+
   // operations to update monitor's counters
   private void record(String path, byte[] data, long startTimeMilliSec, ZkClientMonitor.AccessType accessType) {
     if (_monitor != null) {

http://git-wip-us.apache.org/repos/asf/helix/blob/7bb55742/helix-core/src/main/java/org/apache/helix/participant/HelixCustomCodeRunner.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/HelixCustomCodeRunner.java b/helix-core/src/main/java/org/apache/helix/participant/HelixCustomCodeRunner.java
index d458c52..794e9e1 100644
--- a/helix-core/src/main/java/org/apache/helix/participant/HelixCustomCodeRunner.java
+++ b/helix-core/src/main/java/org/apache/helix/participant/HelixCustomCodeRunner.java
@@ -31,7 +31,8 @@ import org.apache.helix.ZNRecord;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZNRecordSerializer;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.manager.zk.client.HelixZkClient;
+import org.apache.helix.manager.zk.client.SharedZkClientFactory;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.IdealState.RebalanceMode;
 import org.slf4j.Logger;
@@ -129,14 +130,15 @@ public class HelixCustomCodeRunner {
 
     StateMachineEngine stateMach = _manager.getStateMachineEngine();
     stateMach.registerStateModelFactory(LEADER_STANDBY, _stateModelFty, _resourceName);
-    ZkClient zkClient = null;
+    HelixZkClient zkClient = null;
     try {
       // manually add ideal state for participant leader using LeaderStandby
       // model
+      HelixZkClient.ZkClientConfig clientConfig = new HelixZkClient.ZkClientConfig();
+      clientConfig.setZkSerializer(new ZNRecordSerializer());
+      zkClient = SharedZkClientFactory
+          .getInstance().buildZkClient(new HelixZkClient.ZkConnectionConfig(_zkAddr), clientConfig);
 
-      zkClient =
-          new ZkClient(_zkAddr, ZkClient.DEFAULT_SESSION_TIMEOUT,
-              ZkClient.DEFAULT_CONNECTION_TIMEOUT, new ZNRecordSerializer());
       HelixDataAccessor accessor =
           new ZKHelixDataAccessor(_manager.getClusterName(), new ZkBaseDataAccessor<ZNRecord>(
               zkClient));
@@ -161,9 +163,7 @@ public class HelixCustomCodeRunner {
       LOG.info("Set idealState for participantLeader:" + _resourceName + ", idealState:"
           + idealState);
     } finally {
-      if (zkClient != null && zkClient.getConnection() != null)
-
-      {
+      if (zkClient != null && !zkClient.isClosed()) {
         zkClient.close();
       }
     }

http://git-wip-us.apache.org/repos/asf/helix/blob/7bb55742/helix-core/src/main/java/org/apache/helix/tools/MessagePoster.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/MessagePoster.java b/helix-core/src/main/java/org/apache/helix/tools/MessagePoster.java
index 80a7820..b1d6582 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/MessagePoster.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/MessagePoster.java
@@ -24,15 +24,17 @@ import java.util.UUID;
 import org.apache.helix.PropertyPathBuilder;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.manager.zk.ZNRecordSerializer;
-import org.apache.helix.manager.zk.ZkClient;
-import org.apache.helix.model.Message;
+import org.apache.helix.manager.zk.client.HelixZkClient;
+import org.apache.helix.manager.zk.client.SharedZkClientFactory;
 import org.apache.helix.model.LiveInstance.LiveInstanceProperty;
+import org.apache.helix.model.Message;
 import org.apache.helix.model.Message.MessageState;
 import org.apache.helix.model.Message.MessageType;
 
 public class MessagePoster {
   public void post(String zkServer, Message message, String clusterName, String instanceName) {
-    ZkClient client = new ZkClient(zkServer);
+    HelixZkClient client = SharedZkClientFactory.getInstance().buildZkClient(new HelixZkClient.ZkConnectionConfig(
+        zkServer));
     client.setZkSerializer(new ZNRecordSerializer());
     String path = PropertyPathBuilder.instanceMessage(clusterName, instanceName, message.getId());
     client.delete(path);

http://git-wip-us.apache.org/repos/asf/helix/blob/7bb55742/helix-core/src/main/java/org/apache/helix/tools/TestExecutor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/TestExecutor.java b/helix-core/src/main/java/org/apache/helix/tools/TestExecutor.java
index dd6f3a9..908bba5 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/TestExecutor.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/TestExecutor.java
@@ -35,13 +35,15 @@ import org.apache.helix.HelixManager;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.manager.zk.ZNRecordSerializer;
 import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.manager.zk.client.HelixZkClient;
+import org.apache.helix.manager.zk.client.SharedZkClientFactory;
 import org.apache.helix.store.PropertyJsonComparator;
 import org.apache.helix.store.PropertyJsonSerializer;
 import org.apache.helix.store.PropertyStoreException;
 import org.apache.helix.tools.TestCommand.CommandType;
+import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.zookeeper.data.Stat;
 
 /**
  * a test is structured logically as a list of commands a command has three parts: COMMAND
@@ -747,10 +749,11 @@ public class TestExecutor {
       String zkAddr, CountDownLatch countDown) {
 
     final Map<TestCommand, Boolean> testResults = new ConcurrentHashMap<TestCommand, Boolean>();
-    ZkClient zkClient = null;
 
-    zkClient = new ZkClient(zkAddr, ZkClient.DEFAULT_CONNECTION_TIMEOUT);
-    zkClient.setZkSerializer(new ZNRecordSerializer());
+    HelixZkClient.ZkClientConfig clientConfig = new HelixZkClient.ZkClientConfig();
+    clientConfig.setZkSerializer(new ZNRecordSerializer());
+    HelixZkClient zkClient = SharedZkClientFactory
+        .getInstance().buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddr), clientConfig);
 
     // sort on trigger's start time, stable sort
     Collections.sort(commandList, new Comparator<TestCommand>() {
@@ -765,7 +768,7 @@ public class TestExecutor {
 
       TestTrigger trigger = command._trigger;
       command._startTimestamp = System.currentTimeMillis() + trigger._startTime;
-      new Thread(new ExecuteCommand(command._startTimestamp, command, countDown, zkClient,
+      new Thread(new ExecuteCommand(command._startTimestamp, command, countDown, (ZkClient) zkClient,
           testResults)).start();
     }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/7bb55742/helix-core/src/main/java/org/apache/helix/tools/commandtools/ZKDumper.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/commandtools/ZKDumper.java b/helix-core/src/main/java/org/apache/helix/tools/commandtools/ZKDumper.java
index c171b73..cf7f22e 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/commandtools/ZKDumper.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/commandtools/ZKDumper.java
@@ -38,14 +38,15 @@ import org.apache.commons.cli.OptionGroup;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.PosixParser;
 import org.apache.helix.manager.zk.ByteArraySerializer;
-import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.manager.zk.client.HelixZkClient;
+import org.apache.helix.manager.zk.client.SharedZkClientFactory;
 
 /**
  * Dumps the Zookeeper file structure on to Disk
  */
 @SuppressWarnings("static-access")
 public class ZKDumper {
-  private ZkClient client;
+  private HelixZkClient client;
   private FilenameFilter filter;
   static Options options;
   private String suffix = "";
@@ -110,7 +111,8 @@ public class ZKDumper {
   }
 
   public ZKDumper(String zkAddress) {
-    client = new ZkClient(zkAddress, ZkClient.DEFAULT_CONNECTION_TIMEOUT);
+    client = SharedZkClientFactory.getInstance()
+        .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddress));
 
     ZkSerializer zkSerializer = new ByteArraySerializer();
     client.setZkSerializer(zkSerializer);

http://git-wip-us.apache.org/repos/asf/helix/blob/7bb55742/helix-core/src/main/java/org/apache/helix/tools/commandtools/ZkCopy.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/commandtools/ZkCopy.java b/helix-core/src/main/java/org/apache/helix/tools/commandtools/ZkCopy.java
index 5bd955a..805847c 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/commandtools/ZkCopy.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/commandtools/ZkCopy.java
@@ -37,11 +37,12 @@ import org.apache.helix.AccessOption;
 import org.apache.helix.BaseDataAccessor;
 import org.apache.helix.manager.zk.ByteArraySerializer;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.manager.zk.ZkClient;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.helix.manager.zk.client.HelixZkClient;
+import org.apache.helix.manager.zk.client.SharedZkClientFactory;
 import org.apache.zookeeper.common.PathUtils;
 import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Tool for copying a zk/file path to another zk/file path
@@ -99,7 +100,7 @@ public class ZkCopy {
    * @param dstRootPath
    * @param paths
    */
-  private static void copy(ZkClient srcClient, String srcRootPath, ZkClient dstClient,
+  private static void copy(HelixZkClient srcClient, String srcRootPath, HelixZkClient dstClient,
       String dstRootPath, List<String> paths) {
     BaseDataAccessor<Object> srcAccessor = new ZkBaseDataAccessor<Object>(srcClient);
     List<String> readPaths = new ArrayList<String>();
@@ -146,7 +147,8 @@ public class ZkCopy {
     }
   }
 
-  private static void zkCopy(ZkClient srcClient, String srcRootPath, ZkClient dstClient, String dstRootPath) {
+  private static void zkCopy(HelixZkClient srcClient, String srcRootPath, HelixZkClient dstClient,
+      String dstRootPath) {
     // Strip off tailing "/"
     if (!srcRootPath.equals("/") && srcRootPath.endsWith("/")) {
       srcRootPath = srcRootPath.substring(0, srcRootPath.length() - 1);
@@ -218,21 +220,21 @@ public class ZkCopy {
       String srcZkAddr = srcUri.getAuthority();
       String dstZkAddr = dstUri.getAuthority();
 
-      ZkClient srcClient = null;
-      ZkClient dstClient = null;
+      HelixZkClient.ZkClientConfig clientConfig = new HelixZkClient.ZkClientConfig();
+      HelixZkClient srcClient = null;
+      HelixZkClient dstClient = null;
       try {
         if (srcZkAddr.equals(dstZkAddr)) {
-          srcClient =
-              dstClient =
-                  new ZkClient(srcZkAddr, ZkClient.DEFAULT_SESSION_TIMEOUT,
-                      ZkClient.DEFAULT_CONNECTION_TIMEOUT, new ByteArraySerializer());
+          clientConfig.setZkSerializer(new ByteArraySerializer());
+          srcClient = dstClient = SharedZkClientFactory.getInstance()
+              .buildZkClient(new HelixZkClient.ZkConnectionConfig(srcZkAddr), clientConfig);
         } else {
-          srcClient =
-              new ZkClient(srcZkAddr, ZkClient.DEFAULT_SESSION_TIMEOUT,
-                  ZkClient.DEFAULT_CONNECTION_TIMEOUT, new ByteArraySerializer());
-          dstClient =
-              new ZkClient(dstZkAddr, ZkClient.DEFAULT_SESSION_TIMEOUT,
-                  ZkClient.DEFAULT_CONNECTION_TIMEOUT, new ByteArraySerializer());
+          clientConfig.setZkSerializer(new ByteArraySerializer());
+          srcClient = SharedZkClientFactory.getInstance()
+              .buildZkClient(new HelixZkClient.ZkConnectionConfig(srcZkAddr), clientConfig);
+          clientConfig.setZkSerializer(new ByteArraySerializer());
+          dstClient = SharedZkClientFactory.getInstance()
+              .buildZkClient(new HelixZkClient.ZkConnectionConfig(dstZkAddr), clientConfig);
         }
         String srcPath = srcUri.getPath();
         String dstPath = dstUri.getPath();

http://git-wip-us.apache.org/repos/asf/helix/blob/7bb55742/helix-core/src/test/java/org/apache/helix/integration/TestResourceGroupEndtoEnd.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestResourceGroupEndtoEnd.java b/helix-core/src/test/java/org/apache/helix/integration/TestResourceGroupEndtoEnd.java
index d3f447a..63d87eb 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestResourceGroupEndtoEnd.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestResourceGroupEndtoEnd.java
@@ -427,7 +427,7 @@ public class TestResourceGroupEndtoEnd extends ZkTestBase {
 
     @Override
     public ZkClient getZkClient() {
-      return _zkclient;
+      return (ZkClient) _zkclient;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/helix/blob/7bb55742/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterControllerManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterControllerManager.java b/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterControllerManager.java
index ffa2cb2..96f4a88 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterControllerManager.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterControllerManager.java
@@ -89,7 +89,7 @@ public class ClusterControllerManager extends ZKHelixManager implements Runnable
 
   @Override
   public ZkClient getZkClient() {
-    return _zkclient;
+    return (ZkClient) _zkclient;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/helix/blob/7bb55742/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterDistributedController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterDistributedController.java b/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterDistributedController.java
index 1cce08d..b186a1a 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterDistributedController.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterDistributedController.java
@@ -83,7 +83,7 @@ public class ClusterDistributedController extends ZKHelixManager implements Runn
 
   @Override
   public ZkClient getZkClient() {
-    return _zkclient;
+    return (ZkClient) _zkclient;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/helix/blob/7bb55742/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java b/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java
index 2bd2630..362709a 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java
@@ -128,7 +128,7 @@ public class MockParticipantManager extends ZKHelixManager implements Runnable,
 
   @Override
   public ZkClient getZkClient() {
-    return _zkclient;
+    return (ZkClient) _zkclient;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/helix/blob/7bb55742/helix-core/src/test/java/org/apache/helix/manager/zk/TestRawZkClient.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestRawZkClient.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestRawZkClient.java
new file mode 100644
index 0000000..d0cf004
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestRawZkClient.java
@@ -0,0 +1,294 @@
+package org.apache.helix.manager.zk;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.lang.management.ManagementFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import org.I0Itec.zkclient.IZkDataListener;
+import org.I0Itec.zkclient.IZkStateListener;
+import org.I0Itec.zkclient.ZkConnection;
+import org.apache.helix.HelixException;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.monitoring.mbeans.MBeanRegistrar;
+import org.apache.helix.monitoring.mbeans.MonitorDomainNames;
+import org.apache.helix.monitoring.mbeans.ZkClientMonitor;
+import org.apache.helix.monitoring.mbeans.ZkClientPathMonitor;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.AssertJUnit;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class TestRawZkClient extends ZkUnitTestBase {
+  private static Logger LOG = LoggerFactory.getLogger(TestRawZkClient.class);
+
+  ZkClient _zkClient;
+
+  @BeforeClass
+  public void beforeClass() {
+    _zkClient = new ZkClient(ZK_ADDR);
+    _zkClient.setZkSerializer(new ZNRecordSerializer());
+  }
+
+  @AfterClass
+  public void afterClass() {
+    _zkClient.close();
+  }
+
+  @Test()
+  void testGetStat() {
+    String path = "/tmp/getStatTest";
+    _zkClient.deleteRecursively(path);
+
+    Stat stat, newStat;
+    stat = _zkClient.getStat(path);
+    AssertJUnit.assertNull(stat);
+    _zkClient.createPersistent(path, true);
+
+    stat = _zkClient.getStat(path);
+    AssertJUnit.assertNotNull(stat);
+
+    newStat = _zkClient.getStat(path);
+    AssertJUnit.assertEquals(stat, newStat);
+
+    _zkClient.writeData(path, new ZNRecord("Test"));
+    newStat = _zkClient.getStat(path);
+    AssertJUnit.assertNotSame(stat, newStat);
+  }
+
+  @Test()
+  void testSessionExpire() throws Exception {
+    IZkStateListener listener = new IZkStateListener() {
+
+      @Override
+      public void handleStateChanged(KeeperState state) throws Exception {
+        System.out.println("In Old connection New state " + state);
+      }
+
+      @Override
+      public void handleNewSession() throws Exception {
+        System.out.println("In Old connection New session");
+      }
+
+      @Override
+      public void handleSessionEstablishmentError(Throwable var1) throws Exception {
+      }
+    };
+
+    _zkClient.subscribeStateChanges(listener);
+    ZkConnection connection = ((ZkConnection) _zkClient.getConnection());
+    ZooKeeper zookeeper = connection.getZookeeper();
+    System.out.println("old sessionId= " + zookeeper.getSessionId());
+    Watcher watcher = new Watcher() {
+      @Override
+      public void process(WatchedEvent event) {
+        System.out.println("In New connection In process event:" + event);
+      }
+    };
+    ZooKeeper newZookeeper =
+        new ZooKeeper(connection.getServers(), zookeeper.getSessionTimeout(), watcher,
+            zookeeper.getSessionId(), zookeeper.getSessionPasswd());
+    Thread.sleep(3000);
+    System.out.println("New sessionId= " + newZookeeper.getSessionId());
+    Thread.sleep(3000);
+    newZookeeper.close();
+    Thread.sleep(10000);
+    connection = ((ZkConnection) _zkClient.getConnection());
+    zookeeper = connection.getZookeeper();
+    System.out.println("After session expiry sessionId= " + zookeeper.getSessionId());
+  }
+
+  @Test(expectedExceptions = HelixException.class, expectedExceptionsMessageRegExp = "Data size larger than 1M.*")
+  void testDataSizeLimit() {
+    ZNRecord data = new ZNRecord(new String(new char[1024 * 1024]));
+    _zkClient.writeData("/test", data, -1);
+  }
+
+  @Test
+  public void testZkClientMonitor() throws Exception {
+    final String TEST_TAG = "test_monitor";
+    final String TEST_KEY = "test_key";
+    final String TEST_DATA = "testData";
+    final String TEST_ROOT = "/my_cluster/IDEALSTATES";
+    final String TEST_NODE = "/test_zkclient_monitor";
+    final String TEST_PATH = TEST_ROOT + TEST_NODE;
+
+    ZkClient.Builder builder = new ZkClient.Builder();
+    builder.setZkServer(ZK_ADDR).setMonitorKey(TEST_KEY).setMonitorType(TEST_TAG)
+        .setMonitorRootPathOnly(false);
+    ZkClient zkClient = builder.build();
+
+    final long TEST_DATA_SIZE = zkClient.serialize(TEST_DATA, TEST_PATH).length;
+
+    if (_zkClient.exists(TEST_PATH)) {
+      _zkClient.delete(TEST_PATH);
+    }
+    if (!_zkClient.exists(TEST_ROOT)) {
+      _zkClient.createPersistent(TEST_ROOT, true);
+    }
+
+    MBeanServer beanServer = ManagementFactory.getPlatformMBeanServer();
+
+    ObjectName name = MBeanRegistrar
+        .buildObjectName(MonitorDomainNames.HelixZkClient.name(), ZkClientMonitor.MONITOR_TYPE,
+            TEST_TAG, ZkClientMonitor.MONITOR_KEY, TEST_KEY);
+    ObjectName rootname = MBeanRegistrar
+        .buildObjectName(MonitorDomainNames.HelixZkClient.name(), ZkClientMonitor.MONITOR_TYPE,
+            TEST_TAG, ZkClientMonitor.MONITOR_KEY, TEST_KEY, ZkClientPathMonitor.MONITOR_PATH,
+            "Root");
+    ObjectName idealStatename = MBeanRegistrar
+        .buildObjectName(MonitorDomainNames.HelixZkClient.name(), ZkClientMonitor.MONITOR_TYPE,
+            TEST_TAG, ZkClientMonitor.MONITOR_KEY, TEST_KEY, ZkClientPathMonitor.MONITOR_PATH,
+            "IdealStates");
+    Assert.assertTrue(beanServer.isRegistered(rootname));
+    Assert.assertTrue(beanServer.isRegistered(idealStatename));
+
+    // Test exists
+    Assert.assertEquals((long) beanServer.getAttribute(rootname, "ReadCounter"), 0);
+    Assert.assertEquals((long) beanServer.getAttribute(rootname, "ReadTotalLatencyCounter"), 0);
+    Assert.assertEquals((long) beanServer.getAttribute(rootname, "ReadLatencyGauge.Max"), 0);
+    zkClient.exists(TEST_ROOT);
+    Assert.assertEquals((long) beanServer.getAttribute(rootname, "ReadCounter"), 1);
+    Assert.assertTrue((long) beanServer.getAttribute(rootname, "ReadTotalLatencyCounter") >= 0);
+    Assert.assertTrue((long) beanServer.getAttribute(rootname, "ReadLatencyGauge.Max") >= 0);
+
+    // Test create
+    Assert.assertEquals((long) beanServer.getAttribute(rootname, "WriteCounter"), 0);
+    Assert.assertEquals((long) beanServer.getAttribute(rootname, "WriteBytesCounter"), 0);
+    Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "WriteCounter"), 0);
+    Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "WriteBytesCounter"), 0);
+    Assert.assertEquals((long) beanServer.getAttribute(rootname, "WriteTotalLatencyCounter"), 0);
+    Assert.assertEquals((long) beanServer.getAttribute(rootname, "WriteLatencyGauge.Max"), 0);
+    Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "WriteTotalLatencyCounter"),
+        0);
+    Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "WriteLatencyGauge.Max"), 0);
+    zkClient.create(TEST_PATH, TEST_DATA, CreateMode.PERSISTENT);
+    Assert.assertEquals((long) beanServer.getAttribute(rootname, "WriteCounter"), 1);
+    Assert.assertEquals((long) beanServer.getAttribute(rootname, "WriteBytesCounter"),
+        TEST_DATA_SIZE);
+    Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "WriteCounter"), 1);
+    Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "WriteBytesCounter"),
+        TEST_DATA_SIZE);
+    long origWriteTotalLatencyCounter =
+        (long) beanServer.getAttribute(rootname, "WriteTotalLatencyCounter");
+    Assert.assertTrue(origWriteTotalLatencyCounter >= 0);
+    Assert.assertTrue((long) beanServer.getAttribute(rootname, "WriteLatencyGauge.Max") >= 0);
+    long origIdealStatesWriteTotalLatencyCounter =
+        (long) beanServer.getAttribute(idealStatename, "WriteTotalLatencyCounter");
+    Assert.assertTrue(origIdealStatesWriteTotalLatencyCounter >= 0);
+    Assert.assertTrue((long) beanServer.getAttribute(idealStatename, "WriteLatencyGauge.Max") >= 0);
+
+    // Test read
+    Assert.assertEquals((long) beanServer.getAttribute(rootname, "ReadCounter"), 1);
+    Assert.assertEquals((long) beanServer.getAttribute(rootname, "ReadBytesCounter"), 0);
+    Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "ReadCounter"), 0);
+    Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "ReadBytesCounter"), 0);
+    long origReadTotalLatencyCounter =
+        (long) beanServer.getAttribute(rootname, "ReadTotalLatencyCounter");
+    long origIdealStatesReadTotalLatencyCounter =
+        (long) beanServer.getAttribute(idealStatename, "ReadTotalLatencyCounter");
+    Assert.assertEquals(origIdealStatesReadTotalLatencyCounter, 0);
+    Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "ReadLatencyGauge.Max"), 0);
+    zkClient.readData(TEST_PATH, new Stat());
+    Assert.assertEquals((long) beanServer.getAttribute(rootname, "ReadCounter"), 2);
+    Assert
+        .assertEquals((long) beanServer.getAttribute(rootname, "ReadBytesCounter"), TEST_DATA_SIZE);
+    Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "ReadCounter"), 1);
+    Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "ReadBytesCounter"),
+        TEST_DATA_SIZE);
+    Assert.assertTrue((long) beanServer.getAttribute(rootname, "ReadTotalLatencyCounter")
+        >= origReadTotalLatencyCounter);
+    Assert.assertTrue((long) beanServer.getAttribute(idealStatename, "ReadTotalLatencyCounter")
+        >= origIdealStatesReadTotalLatencyCounter);
+    Assert.assertTrue((long) beanServer.getAttribute(idealStatename, "ReadLatencyGauge.Max") >= 0);
+    zkClient.getChildren(TEST_PATH);
+    Assert.assertEquals((long) beanServer.getAttribute(rootname, "ReadCounter"), 3);
+    Assert
+        .assertEquals((long) beanServer.getAttribute(rootname, "ReadBytesCounter"), TEST_DATA_SIZE);
+    Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "ReadCounter"), 2);
+    Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "ReadBytesCounter"),
+        TEST_DATA_SIZE);
+    zkClient.getStat(TEST_PATH);
+    Assert.assertEquals((long) beanServer.getAttribute(rootname, "ReadCounter"), 4);
+    Assert
+        .assertEquals((long) beanServer.getAttribute(rootname, "ReadBytesCounter"), TEST_DATA_SIZE);
+    Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "ReadCounter"), 3);
+    Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "ReadBytesCounter"),
+        TEST_DATA_SIZE);
+    zkClient.readDataAndStat(TEST_PATH, new Stat(), true);
+    Assert.assertEquals((long) beanServer.getAttribute(rootname, "ReadCounter"), 5);
+
+    ZkAsyncCallbacks.ExistsCallbackHandler callbackHandler =
+        new ZkAsyncCallbacks.ExistsCallbackHandler();
+    zkClient.asyncExists(TEST_PATH, callbackHandler);
+    callbackHandler.waitForSuccess();
+    Assert.assertEquals((long) beanServer.getAttribute(rootname, "ReadCounter"), 6);
+
+    // Test write
+    zkClient.writeData(TEST_PATH, TEST_DATA);
+    Assert.assertEquals((long) beanServer.getAttribute(rootname, "WriteCounter"), 2);
+    Assert.assertEquals((long) beanServer.getAttribute(rootname, "WriteBytesCounter"),
+        TEST_DATA_SIZE * 2);
+    Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "WriteCounter"), 2);
+    Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "WriteBytesCounter"),
+        TEST_DATA_SIZE * 2);
+    Assert.assertTrue((long) beanServer.getAttribute(rootname, "WriteTotalLatencyCounter")
+        >= origWriteTotalLatencyCounter);
+    Assert.assertTrue((long) beanServer.getAttribute(idealStatename, "WriteTotalLatencyCounter")
+        >= origIdealStatesWriteTotalLatencyCounter);
+
+    // Test data change count
+    final Lock lock = new ReentrantLock();
+    final Condition callbackFinish = lock.newCondition();
+    zkClient.subscribeDataChanges(TEST_PATH, new IZkDataListener() {
+      @Override
+      public void handleDataChange(String dataPath, Object data) throws Exception {
+      }
+
+      @Override
+      public void handleDataDeleted(String dataPath) throws Exception {
+        lock.lock();
+        try {
+          callbackFinish.signal();
+        } finally {
+          lock.unlock();
+        }
+      }
+    });
+    lock.lock();
+    _zkClient.delete(TEST_PATH);
+    Assert.assertTrue(callbackFinish.await(10, TimeUnit.SECONDS));
+    Assert.assertEquals((long) beanServer.getAttribute(name, "DataChangeEventCounter"), 1);
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/7bb55742/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkClient.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkClient.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkClient.java
deleted file mode 100644
index a18dd29..0000000
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkClient.java
+++ /dev/null
@@ -1,294 +0,0 @@
-package org.apache.helix.manager.zk;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.lang.management.ManagementFactory;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
-import org.I0Itec.zkclient.IZkDataListener;
-import org.I0Itec.zkclient.IZkStateListener;
-import org.I0Itec.zkclient.ZkConnection;
-import org.apache.helix.HelixException;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.ZkUnitTestBase;
-import org.apache.helix.monitoring.mbeans.MBeanRegistrar;
-import org.apache.helix.monitoring.mbeans.MonitorDomainNames;
-import org.apache.helix.monitoring.mbeans.ZkClientMonitor;
-import org.apache.helix.monitoring.mbeans.ZkClientPathMonitor;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.Watcher.Event.KeeperState;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.data.Stat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testng.Assert;
-import org.testng.AssertJUnit;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-
-public class TestZkClient extends ZkUnitTestBase {
-  private static Logger LOG = LoggerFactory.getLogger(TestZkClient.class);
-
-  ZkClient _zkClient;
-
-  @BeforeClass
-  public void beforeClass() {
-    _zkClient = new ZkClient(ZK_ADDR);
-    _zkClient.setZkSerializer(new ZNRecordSerializer());
-  }
-
-  @AfterClass
-  public void afterClass() {
-    _zkClient.close();
-  }
-
-  @Test()
-  void testGetStat() {
-    String path = "/tmp/getStatTest";
-    _zkClient.deleteRecursively(path);
-
-    Stat stat, newStat;
-    stat = _zkClient.getStat(path);
-    AssertJUnit.assertNull(stat);
-    _zkClient.createPersistent(path, true);
-
-    stat = _zkClient.getStat(path);
-    AssertJUnit.assertNotNull(stat);
-
-    newStat = _zkClient.getStat(path);
-    AssertJUnit.assertEquals(stat, newStat);
-
-    _zkClient.writeData(path, new ZNRecord("Test"));
-    newStat = _zkClient.getStat(path);
-    AssertJUnit.assertNotSame(stat, newStat);
-  }
-
-  @Test()
-  void testSessionExpire() throws Exception {
-    IZkStateListener listener = new IZkStateListener() {
-
-      @Override
-      public void handleStateChanged(KeeperState state) throws Exception {
-        System.out.println("In Old connection New state " + state);
-      }
-
-      @Override
-      public void handleNewSession() throws Exception {
-        System.out.println("In Old connection New session");
-      }
-
-      @Override
-      public void handleSessionEstablishmentError(Throwable var1) throws Exception {
-      }
-    };
-
-    _zkClient.subscribeStateChanges(listener);
-    ZkConnection connection = ((ZkConnection) _zkClient.getConnection());
-    ZooKeeper zookeeper = connection.getZookeeper();
-    System.out.println("old sessionId= " + zookeeper.getSessionId());
-    Watcher watcher = new Watcher() {
-      @Override
-      public void process(WatchedEvent event) {
-        System.out.println("In New connection In process event:" + event);
-      }
-    };
-    ZooKeeper newZookeeper =
-        new ZooKeeper(connection.getServers(), zookeeper.getSessionTimeout(), watcher,
-            zookeeper.getSessionId(), zookeeper.getSessionPasswd());
-    Thread.sleep(3000);
-    System.out.println("New sessionId= " + newZookeeper.getSessionId());
-    Thread.sleep(3000);
-    newZookeeper.close();
-    Thread.sleep(10000);
-    connection = ((ZkConnection) _zkClient.getConnection());
-    zookeeper = connection.getZookeeper();
-    System.out.println("After session expiry sessionId= " + zookeeper.getSessionId());
-  }
-
-  @Test(expectedExceptions = HelixException.class, expectedExceptionsMessageRegExp = "Data size larger than 1M.*")
-  void testDataSizeLimit() {
-    ZNRecord data = new ZNRecord(new String(new char[1024 * 1024]));
-    _zkClient.writeData("/test", data, -1);
-  }
-
-  @Test
-  public void testZkClientMonitor() throws Exception {
-    final String TEST_TAG = "test_monitor";
-    final String TEST_KEY = "test_key";
-    final String TEST_DATA = "testData";
-    final String TEST_ROOT = "/my_cluster/IDEALSTATES";
-    final String TEST_NODE = "/test_zkclient_monitor";
-    final String TEST_PATH = TEST_ROOT + TEST_NODE;
-
-    ZkClient.Builder builder = new ZkClient.Builder();
-    builder.setZkServer(ZK_ADDR).setMonitorKey(TEST_KEY).setMonitorType(TEST_TAG)
-        .setMonitorRootPathOnly(false);
-    ZkClient zkClient = builder.build();
-
-    final long TEST_DATA_SIZE = zkClient.serialize(TEST_DATA, TEST_PATH).length;
-
-    if (_zkClient.exists(TEST_PATH)) {
-      _zkClient.delete(TEST_PATH);
-    }
-    if (!_zkClient.exists(TEST_ROOT)) {
-      _zkClient.createPersistent(TEST_ROOT, true);
-    }
-
-    MBeanServer beanServer = ManagementFactory.getPlatformMBeanServer();
-
-    ObjectName name = MBeanRegistrar
-        .buildObjectName(MonitorDomainNames.HelixZkClient.name(), ZkClientMonitor.MONITOR_TYPE,
-            TEST_TAG, ZkClientMonitor.MONITOR_KEY, TEST_KEY);
-    ObjectName rootname = MBeanRegistrar
-        .buildObjectName(MonitorDomainNames.HelixZkClient.name(), ZkClientMonitor.MONITOR_TYPE,
-            TEST_TAG, ZkClientMonitor.MONITOR_KEY, TEST_KEY, ZkClientPathMonitor.MONITOR_PATH,
-            "Root");
-    ObjectName idealStatename = MBeanRegistrar
-        .buildObjectName(MonitorDomainNames.HelixZkClient.name(), ZkClientMonitor.MONITOR_TYPE,
-            TEST_TAG, ZkClientMonitor.MONITOR_KEY, TEST_KEY, ZkClientPathMonitor.MONITOR_PATH,
-            "IdealStates");
-    Assert.assertTrue(beanServer.isRegistered(rootname));
-    Assert.assertTrue(beanServer.isRegistered(idealStatename));
-
-    // Test exists
-    Assert.assertEquals((long) beanServer.getAttribute(rootname, "ReadCounter"), 0);
-    Assert.assertEquals((long) beanServer.getAttribute(rootname, "ReadTotalLatencyCounter"), 0);
-    Assert.assertEquals((long) beanServer.getAttribute(rootname, "ReadLatencyGauge.Max"), 0);
-    zkClient.exists(TEST_ROOT);
-    Assert.assertEquals((long) beanServer.getAttribute(rootname, "ReadCounter"), 1);
-    Assert.assertTrue((long) beanServer.getAttribute(rootname, "ReadTotalLatencyCounter") >= 0);
-    Assert.assertTrue((long) beanServer.getAttribute(rootname, "ReadLatencyGauge.Max") >= 0);
-
-    // Test create
-    Assert.assertEquals((long) beanServer.getAttribute(rootname, "WriteCounter"), 0);
-    Assert.assertEquals((long) beanServer.getAttribute(rootname, "WriteBytesCounter"), 0);
-    Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "WriteCounter"), 0);
-    Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "WriteBytesCounter"), 0);
-    Assert.assertEquals((long) beanServer.getAttribute(rootname, "WriteTotalLatencyCounter"), 0);
-    Assert.assertEquals((long) beanServer.getAttribute(rootname, "WriteLatencyGauge.Max"), 0);
-    Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "WriteTotalLatencyCounter"),
-        0);
-    Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "WriteLatencyGauge.Max"), 0);
-    zkClient.create(TEST_PATH, TEST_DATA, CreateMode.PERSISTENT);
-    Assert.assertEquals((long) beanServer.getAttribute(rootname, "WriteCounter"), 1);
-    Assert.assertEquals((long) beanServer.getAttribute(rootname, "WriteBytesCounter"),
-        TEST_DATA_SIZE);
-    Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "WriteCounter"), 1);
-    Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "WriteBytesCounter"),
-        TEST_DATA_SIZE);
-    long origWriteTotalLatencyCounter =
-        (long) beanServer.getAttribute(rootname, "WriteTotalLatencyCounter");
-    Assert.assertTrue(origWriteTotalLatencyCounter >= 0);
-    Assert.assertTrue((long) beanServer.getAttribute(rootname, "WriteLatencyGauge.Max") >= 0);
-    long origIdealStatesWriteTotalLatencyCounter =
-        (long) beanServer.getAttribute(idealStatename, "WriteTotalLatencyCounter");
-    Assert.assertTrue(origIdealStatesWriteTotalLatencyCounter >= 0);
-    Assert.assertTrue((long) beanServer.getAttribute(idealStatename, "WriteLatencyGauge.Max") >= 0);
-
-    // Test read
-    Assert.assertEquals((long) beanServer.getAttribute(rootname, "ReadCounter"), 1);
-    Assert.assertEquals((long) beanServer.getAttribute(rootname, "ReadBytesCounter"), 0);
-    Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "ReadCounter"), 0);
-    Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "ReadBytesCounter"), 0);
-    long origReadTotalLatencyCounter =
-        (long) beanServer.getAttribute(rootname, "ReadTotalLatencyCounter");
-    long origIdealStatesReadTotalLatencyCounter =
-        (long) beanServer.getAttribute(idealStatename, "ReadTotalLatencyCounter");
-    Assert.assertEquals(origIdealStatesReadTotalLatencyCounter, 0);
-    Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "ReadLatencyGauge.Max"), 0);
-    zkClient.readData(TEST_PATH, new Stat());
-    Assert.assertEquals((long) beanServer.getAttribute(rootname, "ReadCounter"), 2);
-    Assert
-        .assertEquals((long) beanServer.getAttribute(rootname, "ReadBytesCounter"), TEST_DATA_SIZE);
-    Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "ReadCounter"), 1);
-    Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "ReadBytesCounter"),
-        TEST_DATA_SIZE);
-    Assert.assertTrue((long) beanServer.getAttribute(rootname, "ReadTotalLatencyCounter")
-        >= origReadTotalLatencyCounter);
-    Assert.assertTrue((long) beanServer.getAttribute(idealStatename, "ReadTotalLatencyCounter")
-        >= origIdealStatesReadTotalLatencyCounter);
-    Assert.assertTrue((long) beanServer.getAttribute(idealStatename, "ReadLatencyGauge.Max") >= 0);
-    zkClient.getChildren(TEST_PATH);
-    Assert.assertEquals((long) beanServer.getAttribute(rootname, "ReadCounter"), 3);
-    Assert
-        .assertEquals((long) beanServer.getAttribute(rootname, "ReadBytesCounter"), TEST_DATA_SIZE);
-    Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "ReadCounter"), 2);
-    Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "ReadBytesCounter"),
-        TEST_DATA_SIZE);
-    zkClient.getStat(TEST_PATH);
-    Assert.assertEquals((long) beanServer.getAttribute(rootname, "ReadCounter"), 4);
-    Assert
-        .assertEquals((long) beanServer.getAttribute(rootname, "ReadBytesCounter"), TEST_DATA_SIZE);
-    Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "ReadCounter"), 3);
-    Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "ReadBytesCounter"),
-        TEST_DATA_SIZE);
-    zkClient.readDataAndStat(TEST_PATH, new Stat(), true);
-    Assert.assertEquals((long) beanServer.getAttribute(rootname, "ReadCounter"), 5);
-
-    ZkAsyncCallbacks.ExistsCallbackHandler callbackHandler =
-        new ZkAsyncCallbacks.ExistsCallbackHandler();
-    zkClient.asyncExists(TEST_PATH, callbackHandler);
-    callbackHandler.waitForSuccess();
-    Assert.assertEquals((long) beanServer.getAttribute(rootname, "ReadCounter"), 6);
-
-    // Test write
-    zkClient.writeData(TEST_PATH, TEST_DATA);
-    Assert.assertEquals((long) beanServer.getAttribute(rootname, "WriteCounter"), 2);
-    Assert.assertEquals((long) beanServer.getAttribute(rootname, "WriteBytesCounter"),
-        TEST_DATA_SIZE * 2);
-    Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "WriteCounter"), 2);
-    Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "WriteBytesCounter"),
-        TEST_DATA_SIZE * 2);
-    Assert.assertTrue((long) beanServer.getAttribute(rootname, "WriteTotalLatencyCounter")
-        >= origWriteTotalLatencyCounter);
-    Assert.assertTrue((long) beanServer.getAttribute(idealStatename, "WriteTotalLatencyCounter")
-        >= origIdealStatesWriteTotalLatencyCounter);
-
-    // Test data change count
-    final Lock lock = new ReentrantLock();
-    final Condition callbackFinish = lock.newCondition();
-    zkClient.subscribeDataChanges(TEST_PATH, new IZkDataListener() {
-      @Override
-      public void handleDataChange(String dataPath, Object data) throws Exception {
-      }
-
-      @Override
-      public void handleDataDeleted(String dataPath) throws Exception {
-        lock.lock();
-        try {
-          callbackFinish.signal();
-        } finally {
-          lock.unlock();
-        }
-      }
-    });
-    lock.lock();
-    _zkClient.delete(TEST_PATH);
-    Assert.assertTrue(callbackFinish.await(10, TimeUnit.SECONDS));
-    Assert.assertEquals((long) beanServer.getAttribute(name, "DataChangeEventCounter"), 1);
-  }
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/7bb55742/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkReconnect.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkReconnect.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkReconnect.java
index 1f72948..691623e 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkReconnect.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkReconnect.java
@@ -32,6 +32,7 @@ import org.apache.helix.HelixManagerFactory;
 import org.apache.helix.InstanceType;
 import org.apache.helix.SystemPropertyKeys;
 import org.apache.helix.TestHelper;
+import org.apache.helix.manager.zk.client.HelixZkClient;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.helix.tools.ClusterSetup;
 import org.apache.zookeeper.WatchedEvent;
@@ -94,7 +95,7 @@ public class TestZkReconnect {
       // 1. shutdown zkServer and check if handler trigger callback
       zkServer.shutdown();
       // Simulate a retry in ZkClient that will not succeed
-      injectExpire(controller._zkclient);
+      injectExpire((ZkClient) controller._zkclient);
       Assert.assertFalse(controller._zkclient.waitUntilConnected(5000, TimeUnit.MILLISECONDS));
       // While retrying, onDisconnectedFlag = false
       Assert.assertFalse(onDisconnectedFlag.get());
@@ -102,7 +103,7 @@ public class TestZkReconnect {
       // 2. restart zkServer and check if handler will recover connection
       zkServer.start();
       Assert.assertTrue(controller._zkclient
-          .waitUntilConnected(ZkClient.DEFAULT_CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS));
+          .waitUntilConnected(HelixZkClient.DEFAULT_CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS));
       Assert.assertTrue(controller.isConnected());
 
       // New propertyStore should be in good state

http://git-wip-us.apache.org/repos/asf/helix/blob/7bb55742/helix-core/src/test/java/org/apache/helix/manager/zk/client/TestHelixZkClient.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/client/TestHelixZkClient.java b/helix-core/src/test/java/org/apache/helix/manager/zk/client/TestHelixZkClient.java
new file mode 100644
index 0000000..67e2731
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/client/TestHelixZkClient.java
@@ -0,0 +1,197 @@
+package org.apache.helix.manager.zk.client;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.concurrent.TimeUnit;
+
+import org.I0Itec.zkclient.IZkDataListener;
+import org.I0Itec.zkclient.ZkConnection;
+import org.apache.helix.HelixException;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZkUnitTestBase;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestHelixZkClient extends ZkUnitTestBase {
+  final String TEST_NODE = "/test_helix_zkclient";
+
+  @Test public void testZkConnectionManager() {
+    final String TEST_ROOT = "/testZkConnectionManager/IDEALSTATES";
+    final String TEST_PATH = TEST_ROOT + TEST_NODE;
+
+    ZkConnectionManager zkConnectionManager =
+        new ZkConnectionManager(new ZkConnection(ZK_ADDR), HelixZkClient.DEFAULT_CONNECTION_TIMEOUT,
+            null);
+    Assert.assertTrue(zkConnectionManager.waitUntilConnected(1, TimeUnit.SECONDS));
+
+    // This client can write/read from ZK
+    zkConnectionManager.createPersistent(TEST_PATH, true);
+    zkConnectionManager.writeData(TEST_PATH, "Test");
+    Assert.assertTrue(zkConnectionManager.readData(TEST_PATH) != null);
+    zkConnectionManager.deleteRecursively(TEST_ROOT);
+
+    // This client can be shared, and cannot close when sharing
+    SharedZkClient sharedZkClient =
+        new SharedZkClient(zkConnectionManager, new HelixZkClient.ZkClientConfig(), null);
+    try {
+      zkConnectionManager.close();
+      Assert.fail("Dedicated ZkClient cannot be closed while sharing!");
+    } catch (HelixException hex) {
+      // expected
+    }
+
+    // This client can be closed normally when sharing ends
+    sharedZkClient.close();
+    Assert.assertTrue(sharedZkClient.isClosed());
+    Assert.assertFalse(sharedZkClient.waitUntilConnected(100, TimeUnit.MILLISECONDS));
+
+    zkConnectionManager.close();
+    Assert.assertTrue(zkConnectionManager.isClosed());
+    Assert.assertFalse(zkConnectionManager.waitUntilConnected(100, TimeUnit.MILLISECONDS));
+
+    // Sharing a closed dedicated ZkClient shall fail
+    try {
+      new SharedZkClient(zkConnectionManager, new HelixZkClient.ZkClientConfig(), null);
+      Assert.fail("Sharing a closed dedicated ZkClient shall fail.");
+    } catch (HelixException hex) {
+      // expected
+    }
+  }
+
+  @Test(dependsOnMethods = "testZkConnectionManager") public void testSharingZkClient()
+      throws Exception {
+    final String TEST_ROOT = "/testSharedZkClient/IDEALSTATES";
+    final String TEST_PATH = TEST_ROOT + TEST_NODE;
+
+    // A factory just for this tests, this for avoiding the impact from other tests running in parallel.
+    final SharedZkClientFactory testFactory = new SharedZkClientFactory();
+
+    HelixZkClient.ZkConnectionConfig connectionConfig =
+        new HelixZkClient.ZkConnectionConfig(ZK_ADDR);
+    HelixZkClient sharedZkClientA =
+        testFactory.buildZkClient(connectionConfig, new HelixZkClient.ZkClientConfig());
+    Assert.assertTrue(sharedZkClientA.waitUntilConnected(1, TimeUnit.SECONDS));
+
+    HelixZkClient sharedZkClientB =
+        testFactory.buildZkClient(connectionConfig, new HelixZkClient.ZkClientConfig());
+    Assert.assertTrue(sharedZkClientB.waitUntilConnected(1, TimeUnit.SECONDS));
+
+    Assert.assertEquals(testFactory.getActiveConnectionCount(), 1);
+
+    // client A and B is sharing the same session.
+    Assert.assertEquals(sharedZkClientA.getSessionId(), sharedZkClientB.getSessionId());
+    long sessionId = sharedZkClientA.getSessionId();
+
+    final int[] notificationCountA = { 0, 0 };
+    sharedZkClientA.subscribeDataChanges(TEST_PATH, new IZkDataListener() {
+      @Override public void handleDataChange(String s, Object o) {
+        notificationCountA[0]++;
+      }
+
+      @Override public void handleDataDeleted(String s) {
+        notificationCountA[1]++;
+      }
+    });
+    final int[] notificationCountB = { 0, 0 };
+    sharedZkClientB.subscribeDataChanges(TEST_PATH, new IZkDataListener() {
+      @Override public void handleDataChange(String s, Object o) {
+        notificationCountB[0]++;
+      }
+
+      @Override public void handleDataDeleted(String s) {
+        notificationCountB[1]++;
+      }
+    });
+
+    // Modify using client A and client B will get notification.
+    sharedZkClientA.createPersistent(TEST_PATH, true);
+    Assert.assertTrue(TestHelper.verify(new TestHelper.Verifier() {
+      @Override public boolean verify() {
+        return notificationCountB[0] == 1;
+      }
+    }, 1000));
+    Assert.assertEquals(notificationCountB[1], 0);
+
+    sharedZkClientA.deleteRecursively(TEST_ROOT);
+    Assert.assertTrue(TestHelper.verify(new TestHelper.Verifier() {
+      @Override public boolean verify() {
+        return notificationCountB[1] == 1;
+      }
+    }, 1000));
+    Assert.assertEquals(notificationCountB[0], 1);
+
+    try {
+      sharedZkClientA.createEphemeral(TEST_PATH, true);
+      Assert.fail("Create Ephemeral nodes using shared client should fail.");
+    } catch (HelixException he) {
+      // expected.
+    }
+
+    sharedZkClientA.close();
+    // Shared client A closed.
+    Assert.assertTrue(sharedZkClientA.isClosed());
+    Assert.assertFalse(sharedZkClientA.waitUntilConnected(100, TimeUnit.MILLISECONDS));
+    // Shared client B still open.
+    Assert.assertFalse(sharedZkClientB.isClosed());
+    Assert.assertTrue(sharedZkClientB.waitUntilConnected(100, TimeUnit.MILLISECONDS));
+
+    // client A cannot do any modify once closed.
+    try {
+      sharedZkClientA.createPersistent(TEST_PATH, true);
+      Assert.fail("Should not be able to create node with a closed client.");
+    } catch (Exception e) {
+      // expected to be here.
+    }
+
+    // Now modify using client B, and client A won't get notification.
+    sharedZkClientB.createPersistent(TEST_PATH, true);
+    Assert.assertTrue(TestHelper.verify(new TestHelper.Verifier() {
+      @Override public boolean verify() {
+        return notificationCountB[0] == 2;
+      }
+    }, 1000));
+    Assert.assertFalse(TestHelper.verify(new TestHelper.Verifier() {
+      @Override public boolean verify() {
+        return notificationCountA[0] == 2;
+      }
+    }, 1000));
+    sharedZkClientB.deleteRecursively(TEST_ROOT);
+
+    Assert.assertEquals(testFactory.getActiveConnectionCount(), 1);
+
+    sharedZkClientB.close();
+    // Shared client B closed.
+    Assert.assertTrue(sharedZkClientB.isClosed());
+    Assert.assertFalse(sharedZkClientB.waitUntilConnected(100, TimeUnit.MILLISECONDS));
+    Assert.assertEquals(testFactory.getActiveConnectionCount(), 0);
+
+    // Try to create new shared ZkClient, will get a different session
+    HelixZkClient sharedZkClientC =
+        testFactory.buildZkClient(connectionConfig, new HelixZkClient.ZkClientConfig());
+    Assert.assertFalse(sessionId == sharedZkClientC.getSessionId());
+    Assert.assertEquals(testFactory.getActiveConnectionCount(), 1);
+
+    sharedZkClientC.close();
+    // Shared client C closed.
+    Assert.assertTrue(sharedZkClientC.isClosed());
+    Assert.assertFalse(sharedZkClientC.waitUntilConnected(100, TimeUnit.MILLISECONDS));
+    Assert.assertEquals(testFactory.getActiveConnectionCount(), 0);
+  }
+}


[2/2] helix git commit: Introduce Helix ZkClient factory. And use the factory to generate new ZkClient in the critical Helix components.

Posted by jx...@apache.org.
Introduce Helix ZkClient factory. And use the factory to generate new ZkClient in the critical Helix components.

The motivation of this change is sharing ZkConnection as much as possible.
DedicatedZkClient: the client that uses it's own connection.
SharedZkClient: the client that uses a shared ZkConnection with other share client.

Also defining a safer client interface HelixZkClient so as to hide the internal ZkConnection.

For the critical Helix components, the plan is:
- HelixManager (CONTROLLER, PARTICIPANT, CONTROLLER_PARTICIPANT, SPECTATOR): Dedicated ZkClient
- HelixManager (ADMINISTRATOR): Shared ZkClient
- HelixPropertyStore: Shared ZkClient
- HelixZkAccess or ZkAdmin: Shared ZkClient

ZkClient Guide Line
- DedicatedZkClient
  Isolated, no latency concern.
- SharedZkClient
  Eco friendly, could introduce more latency since multiple watchers are sequentially handled in a single connection.
  Multiple clients will use the same session. So a major difference between shared clients and dedicated clients is that, closing a shared client does not close the session automatically. Given this limitation, creating ephemeral nodes using a shared ZkClient is not supported.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/7bb55742
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/7bb55742
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/7bb55742

Branch: refs/heads/master
Commit: 7bb55742e2fe2b61c634dd559cf86a71da50fcdf
Parents: 281f5d1
Author: Jiajun Wang <jj...@linkedin.com>
Authored: Fri Aug 10 10:31:36 2018 -0700
Committer: Junkai Xue <jx...@linkedin.com>
Committed: Mon Oct 29 17:40:24 2018 -0700

----------------------------------------------------------------------
 .../java/org/apache/helix/ConfigAccessor.java   |   6 +-
 .../helix/manager/zk/CallbackHandler.java       |  11 +-
 .../helix/manager/zk/ParticipantManager.java    |   4 +-
 .../apache/helix/manager/zk/ZKHelixAdmin.java   |  50 +--
 .../apache/helix/manager/zk/ZKHelixManager.java |  52 ++--
 .../org/apache/helix/manager/zk/ZKUtil.java     |  27 +-
 .../helix/manager/zk/ZkBaseDataAccessor.java    |  78 +----
 .../manager/zk/ZkCacheBaseDataAccessor.java     |  48 +--
 .../org/apache/helix/manager/zk/ZkClient.java   |  27 +-
 .../zk/client/DedicatedZkClientFactory.java     |  35 +++
 .../helix/manager/zk/client/HelixZkClient.java  | 306 +++++++++++++++++++
 .../manager/zk/client/HelixZkClientFactory.java |  46 +++
 .../helix/manager/zk/client/SharedZkClient.java |  92 ++++++
 .../zk/client/SharedZkClientFactory.java        |  87 ++++++
 .../manager/zk/client/ZkConnectionManager.java  | 120 ++++++++
 .../helix/manager/zk/zookeeper/ZkClient.java    | 146 +++++----
 .../participant/HelixCustomCodeRunner.java      |  16 +-
 .../org/apache/helix/tools/MessagePoster.java   |   8 +-
 .../org/apache/helix/tools/TestExecutor.java    |  13 +-
 .../helix/tools/commandtools/ZKDumper.java      |   8 +-
 .../apache/helix/tools/commandtools/ZkCopy.java |  36 +--
 .../integration/TestResourceGroupEndtoEnd.java  |   2 +-
 .../manager/ClusterControllerManager.java       |   2 +-
 .../manager/ClusterDistributedController.java   |   2 +-
 .../manager/MockParticipantManager.java         |   2 +-
 .../helix/manager/zk/TestRawZkClient.java       | 294 ++++++++++++++++++
 .../apache/helix/manager/zk/TestZkClient.java   | 294 ------------------
 .../helix/manager/zk/TestZkReconnect.java       |   5 +-
 .../manager/zk/client/TestHelixZkClient.java    | 197 ++++++++++++
 29 files changed, 1464 insertions(+), 550 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/7bb55742/helix-core/src/main/java/org/apache/helix/ConfigAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/ConfigAccessor.java b/helix-core/src/main/java/org/apache/helix/ConfigAccessor.java
index 70df719..2755113 100644
--- a/helix-core/src/main/java/org/apache/helix/ConfigAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/ConfigAccessor.java
@@ -27,12 +27,12 @@ import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 
+import org.apache.helix.manager.zk.client.HelixZkClient;
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.ConfigScope;
 import org.apache.helix.model.HelixConfigScope;
 import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
 import org.apache.helix.manager.zk.ZKUtil;
-import org.apache.helix.manager.zk.ZkClient;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.ResourceConfig;
 import org.apache.helix.model.builder.HelixConfigScopeBuilder;
@@ -66,13 +66,13 @@ public class ConfigAccessor {
     // @formatter:on
   }
 
-  private final ZkClient zkClient;
+  private final HelixZkClient zkClient;
 
   /**
    * Initialize an accessor with a Zookeeper client
    * @param zkClient
    */
-  public ConfigAccessor(ZkClient zkClient) {
+  public ConfigAccessor(HelixZkClient zkClient) {
     this.zkClient = zkClient;
   }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/7bb55742/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
index cd446e8..b6d452d 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
@@ -57,6 +57,7 @@ import org.apache.helix.PropertyKey;
 import org.apache.helix.PropertyPathConfig;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.common.DedupEventProcessor;
+import org.apache.helix.manager.zk.client.HelixZkClient;
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.ExternalView;
@@ -94,7 +95,7 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener {
   private final Set<EventType> _eventTypes;
   private final HelixDataAccessor _accessor;
   private final ChangeType _changeType;
-  private final ZkClient _zkClient;
+  private final HelixZkClient _zkClient;
   private final AtomicLong _lastNotificationTimeStamp;
   private final HelixManager _manager;
   private final PropertyKey _propertyKey;
@@ -178,14 +179,22 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener {
    */
   private List<NotificationContext.Type> _expectTypes = nextNotificationType.get(Type.FINALIZE);
 
+  @Deprecated
   public CallbackHandler(HelixManager manager, ZkClient client, PropertyKey propertyKey,
       Object listener, EventType[] eventTypes, ChangeType changeType) {
     this(manager, client, propertyKey, listener, eventTypes, changeType, null);
   }
 
+  @Deprecated
   public CallbackHandler(HelixManager manager, ZkClient client, PropertyKey propertyKey,
       Object listener, EventType[] eventTypes, ChangeType changeType,
       HelixCallbackMonitor monitor) {
+    this(manager, (HelixZkClient) client, propertyKey, listener, eventTypes, changeType, monitor);
+  }
+
+  public CallbackHandler(HelixManager manager, HelixZkClient client, PropertyKey propertyKey,
+        Object listener, EventType[] eventTypes, ChangeType changeType,
+        HelixCallbackMonitor monitor) {
     if (listener == null) {
       throw new HelixException("listener could not be null");
     }

http://git-wip-us.apache.org/repos/asf/helix/blob/7bb55742/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
index c1d96c8..36fb969 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
@@ -39,6 +39,7 @@ import org.apache.helix.PreConnectCallback;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.ZNRecordBucketizer;
+import org.apache.helix.manager.zk.client.HelixZkClient;
 import org.apache.helix.messaging.DefaultMessagingService;
 import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.HelixConfigScope;
@@ -47,7 +48,6 @@ import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.ParticipantHistory;
 import org.apache.helix.model.StateModelDefinition;
 import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
-import org.apache.helix.model.Message.MessageType;
 import org.apache.helix.model.builder.HelixConfigScopeBuilder;
 import org.apache.helix.participant.StateMachineEngine;
 import org.apache.helix.participant.statemachine.ScheduledTaskStateModelFactory;
@@ -61,7 +61,7 @@ import org.apache.zookeeper.data.Stat;
 public class ParticipantManager {
   private static Logger LOG = LoggerFactory.getLogger(ParticipantManager.class);
 
-  final ZkClient _zkclient;
+  final HelixZkClient _zkclient;
   final HelixManager _manager;
   final PropertyKey.Builder _keyBuilder;
   final String _clusterName;

http://git-wip-us.apache.org/repos/asf/helix/blob/7bb55742/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
index 59336fd..0f79175 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
@@ -36,6 +36,7 @@ import java.util.Set;
 import java.util.TreeMap;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
+
 import org.I0Itec.zkclient.DataUpdater;
 import org.I0Itec.zkclient.exception.ZkNoNodeException;
 import org.apache.helix.AccessOption;
@@ -53,6 +54,8 @@ import org.apache.helix.PropertyPathBuilder;
 import org.apache.helix.PropertyType;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.controller.rebalancer.strategy.RebalanceStrategy;
+import org.apache.helix.manager.zk.client.HelixZkClient;
+import org.apache.helix.manager.zk.client.SharedZkClientFactory;
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.ClusterConstraints;
 import org.apache.helix.model.ClusterConstraints.ConstraintType;
@@ -78,20 +81,23 @@ import org.slf4j.LoggerFactory;
 
 public class ZKHelixAdmin implements HelixAdmin {
   public static final String CONNECTION_TIMEOUT = "helixAdmin.timeOutInSec";
-  private final ZkClient _zkClient;
+  private final HelixZkClient _zkClient;
   private final ConfigAccessor _configAccessor;
 
   private static Logger logger = LoggerFactory.getLogger(ZKHelixAdmin.class);
 
-  public ZKHelixAdmin(ZkClient zkClient) {
+  public ZKHelixAdmin(HelixZkClient zkClient) {
     _zkClient = zkClient;
     _configAccessor = new ConfigAccessor(zkClient);
   }
 
   public ZKHelixAdmin(String zkAddress) {
     int timeOutInSec = Integer.parseInt(System.getProperty(CONNECTION_TIMEOUT, "30"));
-    _zkClient = new ZkClient(zkAddress, timeOutInSec * 1000);
-    _zkClient.setZkSerializer(new ZNRecordSerializer());
+    HelixZkClient.ZkClientConfig clientConfig = new HelixZkClient.ZkClientConfig();
+    clientConfig.setZkSerializer(new ZNRecordSerializer())
+        .setConnectInitTimeout(timeOutInSec * 1000);
+    _zkClient = SharedZkClientFactory.getInstance()
+        .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddress), clientConfig);
     _zkClient.waitUntilConnected(timeOutInSec, TimeUnit.SECONDS);
     _configAccessor = new ConfigAccessor(_zkClient);
   }
@@ -207,8 +213,7 @@ public class ZKHelixAdmin implements HelixAdmin {
   }
 
   @Override
-  public void enableInstance(String clusterName, List<String> instances,
-      boolean enabled) {
+  public void enableInstance(String clusterName, List<String> instances, boolean enabled) {
     // TODO: Reenable this after storage node bug fixed.
     if (true) {
       throw new HelixException("Current batch enable/disable instances are temporarily disabled!");
@@ -698,8 +703,7 @@ public class ZKHelixAdmin implements HelixAdmin {
   }
 
   @Override
-  public void addResource(String clusterName, String resourceName,
-      IdealState idealstate) {
+  public void addResource(String clusterName, String resourceName, IdealState idealstate) {
     logger.info("Add resource {} in cluster {}.", resourceName, clusterName);
     String stateModelRef = idealstate.getStateModelDefRef();
     String stateModelDefPath = PropertyPathBuilder.stateModelDef(clusterName, stateModelRef);
@@ -874,8 +878,7 @@ public class ZKHelixAdmin implements HelixAdmin {
   }
 
   @Override
-  public StateModelDefinition getStateModelDef(String clusterName,
-      String stateModelName) {
+  public StateModelDefinition getStateModelDef(String clusterName, String stateModelName) {
     HelixDataAccessor accessor =
         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
     Builder keyBuilder = accessor.keyBuilder();
@@ -965,8 +968,8 @@ public class ZKHelixAdmin implements HelixAdmin {
   }
 
   @Override
-  public void rebalance(String clusterName, String resourceName, int replica,
-      String keyPrefix, String group) {
+  public void rebalance(String clusterName, String resourceName, int replica, String keyPrefix,
+      String group) {
     List<String> instanceNames = new LinkedList<String>();
     if (keyPrefix == null || keyPrefix.length() == 0) {
       keyPrefix = resourceName;
@@ -1074,8 +1077,8 @@ public class ZKHelixAdmin implements HelixAdmin {
   }
 
   @Override
-  public void addIdealState(String clusterName, String resourceName,
-      String idealStateFile) throws IOException {
+  public void addIdealState(String clusterName, String resourceName, String idealStateFile)
+      throws IOException {
     logger.info("Add IdealState for resource {} to cluster {} by file name {}.", resourceName,
         clusterName, idealStateFile);
     ZNRecord idealStateRecord =
@@ -1132,9 +1135,9 @@ public class ZKHelixAdmin implements HelixAdmin {
     baseAccessor.update(path, new DataUpdater<ZNRecord>() {
       @Override
       public ZNRecord update(ZNRecord currentData) {
-        ClusterConstraints constraints = currentData == null
-            ? new ClusterConstraints(constraintType)
-            : new ClusterConstraints(currentData);
+        ClusterConstraints constraints = currentData == null ?
+            new ClusterConstraints(constraintType) :
+            new ClusterConstraints(currentData);
 
         constraints.addConstraintItem(constraintId, constraintItem);
         return constraints.getRecord();
@@ -1167,8 +1170,7 @@ public class ZKHelixAdmin implements HelixAdmin {
   }
 
   @Override
-  public ClusterConstraints getConstraints(String clusterName,
-      ConstraintType constraintType) {
+  public ClusterConstraints getConstraints(String clusterName, ConstraintType constraintType) {
     HelixDataAccessor accessor =
         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
 
@@ -1183,7 +1185,6 @@ public class ZKHelixAdmin implements HelixAdmin {
    * @param clusterName
    * @param currentIdealState
    * @param instanceNames
-   *
    * @return
    */
   @Override
@@ -1250,7 +1251,8 @@ public class ZKHelixAdmin implements HelixAdmin {
     }
 
     if (!ZKUtil.isInstanceSetup(_zkClient, clusterName, instanceName, InstanceType.PARTICIPANT)) {
-      throw new HelixException("cluster " + clusterName + " instance " + instanceName + " is not setup yet");
+      throw new HelixException(
+          "cluster " + clusterName + " instance " + instanceName + " is not setup yet");
     }
     HelixDataAccessor accessor =
         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
@@ -1318,8 +1320,7 @@ public class ZKHelixAdmin implements HelixAdmin {
   }
 
   @Override
-  public void enableBatchMessageMode(String clusterName, String resourceName,
-      boolean enabled) {
+  public void enableBatchMessageMode(String clusterName, String resourceName, boolean enabled) {
     logger.info("{} batch message mode for resource {} in cluster {}.",
         enabled ? "Enable" : "Disable", resourceName, clusterName);
     // TODO: Change IdealState to ResourceConfig when configs are migrated to ResourceConfig
@@ -1425,7 +1426,8 @@ public class ZKHelixAdmin implements HelixAdmin {
     return instances;
   }
 
-  @Override public void close() {
+  @Override
+  public void close() {
     if (_zkClient != null) {
       _zkClient.close();
     }

http://git-wip-us.apache.org/repos/asf/helix/blob/7bb55742/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
index e6fabc1..98e2737 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
@@ -20,12 +20,11 @@ package org.apache.helix.manager.zk;
  */
 
 import org.I0Itec.zkclient.IZkStateListener;
-import org.I0Itec.zkclient.ZkConnection;
 import org.I0Itec.zkclient.exception.ZkInterruptedException;
 import org.apache.helix.*;
 import org.apache.helix.HelixConstants.ChangeType;
 import org.apache.helix.PropertyKey.Builder;
-import org.apache.helix.api.listeners.*;
+import org.apache.helix.api.listeners.ClusterConfigChangeListener;
 import org.apache.helix.api.listeners.ConfigChangeListener;
 import org.apache.helix.api.listeners.ControllerChangeListener;
 import org.apache.helix.api.listeners.CurrentStateChangeListener;
@@ -34,11 +33,15 @@ import org.apache.helix.api.listeners.IdealStateChangeListener;
 import org.apache.helix.api.listeners.InstanceConfigChangeListener;
 import org.apache.helix.api.listeners.LiveInstanceChangeListener;
 import org.apache.helix.api.listeners.MessageListener;
+import org.apache.helix.api.listeners.ResourceConfigChangeListener;
 import org.apache.helix.api.listeners.ScopedConfigChangeListener;
 import org.apache.helix.controller.GenericHelixController;
 import org.apache.helix.healthcheck.ParticipantHealthReportCollector;
 import org.apache.helix.healthcheck.ParticipantHealthReportCollectorImpl;
 import org.apache.helix.healthcheck.ParticipantHealthReportTask;
+import org.apache.helix.manager.zk.client.DedicatedZkClientFactory;
+import org.apache.helix.manager.zk.client.HelixZkClient;
+import org.apache.helix.manager.zk.client.SharedZkClientFactory;
 import org.apache.helix.messaging.DefaultMessagingService;
 import org.apache.helix.model.BuiltInStateModelDefinitions;
 import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
@@ -65,7 +68,6 @@ import java.util.Map;
 import java.util.Timer;
 import java.util.concurrent.TimeUnit;
 
-
 public class ZKHelixManager implements HelixManager, IZkStateListener {
   private static Logger LOG = LoggerFactory.getLogger(ZKHelixManager.class);
 
@@ -92,7 +94,7 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
   private final String _version;
   private int _reportLatency;
 
-  protected ZkClient _zkclient = null;
+  protected HelixZkClient _zkclient = null;
   private final DefaultMessagingService _messagingService;
   private Map<ChangeType, HelixCallbackMonitor> _callbackMonitors;
 
@@ -229,11 +231,11 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
             ZKHelixManager.DEFAULT_MAX_DISCONNECT_THRESHOLD);
 
     _sessionTimeout = HelixUtil.getSystemPropertyAsInt(SystemPropertyKeys.ZK_SESSION_TIMEOUT,
-        ZkClient.DEFAULT_SESSION_TIMEOUT);
+        HelixZkClient.DEFAULT_SESSION_TIMEOUT);
 
     _connectionInitTimeout = HelixUtil
         .getSystemPropertyAsInt(SystemPropertyKeys.ZK_CONNECTION_TIMEOUT,
-            ZkClient.DEFAULT_CONNECTION_TIMEOUT);
+            HelixZkClient.DEFAULT_CONNECTION_TIMEOUT);
 
     _waitForConnectedTimeout = HelixUtil
         .getSystemPropertyAsInt(SystemPropertyKeys.ZK_WAIT_CONNECTED_TIMEOUT,
@@ -594,17 +596,29 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
     PathBasedZkSerializer zkSerializer =
         ChainedPathZkSerializer.builder(new ZNRecordStreamingSerializer()).build();
 
-    ZkClient.Builder zkClientBuilder = new ZkClient.Builder();
-    zkClientBuilder.setZkServer(_zkAddress)
-        .setSessionTimeout(_sessionTimeout)
-        .setConnectionTimeout(_connectionInitTimeout)
+    HelixZkClient.ZkConnectionConfig connectionConfig = new HelixZkClient.ZkConnectionConfig(_zkAddress);
+    connectionConfig.setSessionTimeout(_sessionTimeout);
+    HelixZkClient.ZkClientConfig clientConfig = new HelixZkClient.ZkClientConfig();
+    clientConfig
         .setZkSerializer(zkSerializer)
+        .setConnectInitTimeout(_connectionInitTimeout)
         .setMonitorType(_instanceType.name())
         .setMonitorKey(_clusterName)
         .setMonitorInstanceName(_instanceName)
-        .setMonitorRootPathOnly(!_instanceType.equals(InstanceType.CONTROLLER) &&
-            !_instanceType.equals(InstanceType.CONTROLLER_PARTICIPANT));
-    ZkClient newClient = zkClientBuilder.build();
+        .setMonitorRootPathOnly(!_instanceType.equals(InstanceType.CONTROLLER) && !_instanceType
+            .equals(InstanceType.CONTROLLER_PARTICIPANT));
+
+    HelixZkClient newClient;
+    switch (_instanceType) {
+    case ADMINISTRATOR:
+      newClient = SharedZkClientFactory.getInstance().buildZkClient(connectionConfig, clientConfig);
+      break;
+    default:
+      newClient = DedicatedZkClientFactory
+          .getInstance().buildZkClient(connectionConfig, clientConfig);
+      break;
+    }
+
     synchronized (this) {
       if (_zkclient != null) {
         _zkclient.close();
@@ -896,8 +910,7 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
         continue;
       }
 
-      ZkConnection zkConnection = ((ZkConnection) _zkclient.getConnection());
-      _sessionId = Long.toHexString(zkConnection.getZookeeper().getSessionId());
+      _sessionId = Long.toHexString(_zkclient.getSessionId());
 
       /**
        * at the time we read session-id, zkconnection might be lost again
@@ -906,8 +919,7 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
     } while (!isConnected || "0".equals(_sessionId));
 
     LOG.info("Handling new session, session id: " + _sessionId + ", instance: " + _instanceName
-        + ", instanceTye: " + _instanceType + ", cluster: " + _clusterName + ", zkconnection: "
-        + ((ZkConnection) _zkclient.getConnection()).getZookeeper());
+        + ", instanceTye: " + _instanceType + ", cluster: " + _clusterName);
   }
 
   void initHandlers(List<CallbackHandler> handlers) {
@@ -960,9 +972,7 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
   public void handleStateChanged(KeeperState state) {
     switch (state) {
     case SyncConnected:
-      ZkConnection zkConnection = (ZkConnection) _zkclient.getConnection();
-      LOG.info("KeeperState: " + state + ", instance: " + _instanceName + ", type: " + _instanceType
-          + ", zookeeper:" + zkConnection.getZookeeper());
+      LOG.info("KeeperState: " + state + ", instance: " + _instanceName + ", type: " + _instanceType);
       break;
     case Disconnected:
       /**
@@ -1084,7 +1094,7 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
       _participantManager.reset();
     }
     _participantManager =
-        new ParticipantManager(this, _zkclient, _sessionTimeout, _liveInstanceInfoProvider,
+        new ParticipantManager(this, (ZkClient) _zkclient, _sessionTimeout, _liveInstanceInfoProvider,
             _preConnectCallbacks);
     _participantManager.handleNewSession();
   }

http://git-wip-us.apache.org/repos/asf/helix/blob/7bb55742/helix-core/src/main/java/org/apache/helix/manager/zk/ZKUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKUtil.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKUtil.java
index d3ee0c7..f8d1826 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKUtil.java
@@ -28,6 +28,7 @@ import org.apache.helix.BaseDataAccessor;
 import org.apache.helix.InstanceType;
 import org.apache.helix.PropertyPathBuilder;
 import org.apache.helix.ZNRecord;
+import org.apache.helix.manager.zk.client.HelixZkClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.zookeeper.CreateMode;
@@ -40,7 +41,7 @@ public final class ZKUtil {
   private ZKUtil() {
   }
 
-  public static boolean isClusterSetup(String clusterName, ZkClient zkClient) {
+  public static boolean isClusterSetup(String clusterName, HelixZkClient zkClient) {
     if (clusterName == null) {
       logger.info("Fail to check cluster setup : cluster name is null!");
       return false;
@@ -86,7 +87,7 @@ public final class ZKUtil {
     return isValid;
   }
 
-  public static boolean isInstanceSetup(ZkClient zkclient, String clusterName, String instanceName,
+  public static boolean isInstanceSetup(HelixZkClient zkclient, String clusterName, String instanceName,
       InstanceType type) {
     if (type == InstanceType.PARTICIPANT || type == InstanceType.CONTROLLER_PARTICIPANT) {
       ArrayList<String> requiredPaths = new ArrayList<>();
@@ -119,7 +120,7 @@ public final class ZKUtil {
     return true;
   }
 
-  public static void createChildren(ZkClient client, String parentPath, List<ZNRecord> list) {
+  public static void createChildren(HelixZkClient client, String parentPath, List<ZNRecord> list) {
     client.createPersistent(parentPath, true);
     if (list != null) {
       for (ZNRecord record : list) {
@@ -128,7 +129,7 @@ public final class ZKUtil {
     }
   }
 
-  public static void createChildren(ZkClient client, String parentPath, ZNRecord nodeRecord) {
+  public static void createChildren(HelixZkClient client, String parentPath, ZNRecord nodeRecord) {
     client.createPersistent(parentPath, true);
 
     String id = nodeRecord.getId();
@@ -136,7 +137,7 @@ public final class ZKUtil {
     client.createPersistent(temp, nodeRecord);
   }
 
-  public static void dropChildren(ZkClient client, String parentPath, List<ZNRecord> list) {
+  public static void dropChildren(HelixZkClient client, String parentPath, List<ZNRecord> list) {
     // TODO: check if parentPath exists
     if (list != null) {
       for (ZNRecord record : list) {
@@ -145,14 +146,14 @@ public final class ZKUtil {
     }
   }
 
-  public static void dropChildren(ZkClient client, String parentPath, ZNRecord nodeRecord) {
+  public static void dropChildren(HelixZkClient client, String parentPath, ZNRecord nodeRecord) {
     // TODO: check if parentPath exists
     String id = nodeRecord.getId();
     String temp = parentPath + "/" + id;
     client.deleteRecursively(temp);
   }
 
-  public static List<ZNRecord> getChildren(ZkClient client, String path) {
+  public static List<ZNRecord> getChildren(HelixZkClient client, String path) {
     // parent watch will be set by zkClient
     List<String> children = client.getChildren(path);
     if (children == null || children.size() == 0) {
@@ -174,7 +175,7 @@ public final class ZKUtil {
     return childRecords;
   }
 
-  public static void updateIfExists(ZkClient client, String path, final ZNRecord record,
+  public static void updateIfExists(HelixZkClient client, String path, final ZNRecord record,
       boolean mergeOnUpdate) {
     if (client.exists(path)) {
       DataUpdater<Object> updater = new DataUpdater<Object>() {
@@ -187,7 +188,7 @@ public final class ZKUtil {
     }
   }
 
-  public static void createOrMerge(ZkClient client, String path, final ZNRecord record,
+  public static void createOrMerge(HelixZkClient client, String path, final ZNRecord record,
       final boolean persistent, final boolean mergeOnUpdate) {
     int retryCount = 0;
     while (retryCount < RETRYLIMIT) {
@@ -223,7 +224,7 @@ public final class ZKUtil {
     }
   }
 
-  public static void createOrUpdate(ZkClient client, String path, final ZNRecord record,
+  public static void createOrUpdate(HelixZkClient client, String path, final ZNRecord record,
       final boolean persistent, final boolean mergeOnUpdate) {
     int retryCount = 0;
     while (retryCount < RETRYLIMIT) {
@@ -252,7 +253,7 @@ public final class ZKUtil {
     }
   }
 
-  public static void asyncCreateOrMerge(ZkClient client, String path, final ZNRecord record,
+  public static void asyncCreateOrMerge(HelixZkClient client, String path, final ZNRecord record,
       final boolean persistent, final boolean mergeOnUpdate) {
     try {
       if (client.exists(path)) {
@@ -287,7 +288,7 @@ public final class ZKUtil {
     }
   }
 
-  public static void createOrReplace(ZkClient client, String path, final ZNRecord record,
+  public static void createOrReplace(HelixZkClient client, String path, final ZNRecord record,
       final boolean persistent) {
     int retryCount = 0;
     while (retryCount < RETRYLIMIT) {
@@ -313,7 +314,7 @@ public final class ZKUtil {
     }
   }
 
-  public static void subtract(ZkClient client, String path, final ZNRecord recordTosubtract) {
+  public static void subtract(HelixZkClient client, String path, final ZNRecord recordTosubtract) {
     int retryCount = 0;
     while (retryCount < RETRYLIMIT) {
       try {

http://git-wip-us.apache.org/repos/asf/helix/blob/7bb55742/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
index 6811766..8d932c8 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
@@ -36,13 +36,13 @@ import org.I0Itec.zkclient.exception.ZkNodeExistsException;
 import org.apache.helix.AccessOption;
 import org.apache.helix.BaseDataAccessor;
 import org.apache.helix.HelixException;
-import org.apache.helix.ZNRecord;
 import org.apache.helix.api.exceptions.HelixMetaDataAccessException;
 import org.apache.helix.manager.zk.ZkAsyncCallbacks.CreateCallbackHandler;
 import org.apache.helix.manager.zk.ZkAsyncCallbacks.DeleteCallbackHandler;
 import org.apache.helix.manager.zk.ZkAsyncCallbacks.ExistsCallbackHandler;
 import org.apache.helix.manager.zk.ZkAsyncCallbacks.GetDataCallbackHandler;
 import org.apache.helix.manager.zk.ZkAsyncCallbacks.SetDataCallbackHandler;
+import org.apache.helix.manager.zk.client.HelixZkClient;
 import org.apache.helix.store.zk.ZNode;
 import org.apache.helix.util.HelixUtil;
 import org.apache.zookeeper.CreateMode;
@@ -75,7 +75,7 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
 
     public AccessResult() {
       _retCode = RetCode.ERROR;
-      _pathCreated = new ArrayList<String>();
+      _pathCreated = new ArrayList<>();
       _stat = new Stat();
       _updatedValue = null;
     }
@@ -83,9 +83,9 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
 
   private static Logger LOG = LoggerFactory.getLogger(ZkBaseDataAccessor.class);
 
-  private final ZkClient _zkClient;
+  private final HelixZkClient _zkClient;
 
-  public ZkBaseDataAccessor(ZkClient zkClient) {
+  public ZkBaseDataAccessor(HelixZkClient zkClient) {
     if (zkClient == null) {
       throw new NullPointerException("zkclient is null");
     }
@@ -431,7 +431,6 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
     return getChildren(parentPath, stats, options, false);
   }
 
-
   @Override
   public List<T> getChildren(String parentPath, List<Stat> stats, int options, int retryCount,
       int retryInterval) throws HelixException {
@@ -587,7 +586,7 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
       }
 
       List<String> parentPaths =
-          new ArrayList<String>(Collections.<String> nCopies(paths.size(), null));
+          new ArrayList<>(Collections.<String>nCopies(paths.size(), null));
       boolean failOnNoNode = false;
 
       for (int i = 0; i < paths.size(); i++) {
@@ -658,7 +657,7 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
     boolean[] needCreate = new boolean[paths.size()];
     Arrays.fill(needCreate, true);
     List<List<String>> pathsCreated =
-        new ArrayList<List<String>>(Collections.<List<String>> nCopies(paths.size(), null));
+        new ArrayList<>(Collections.<List<String>>nCopies(paths.size(), null));
 
     long startT = System.nanoTime();
     try {
@@ -712,7 +711,7 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
       return success;
     }
 
-    List<Stat> setStats = new ArrayList<Stat>(Collections.<Stat> nCopies(paths.size(), null));
+    List<Stat> setStats = new ArrayList<>(Collections.<Stat>nCopies(paths.size(), null));
     SetDataCallbackHandler[] cbList = new SetDataCallbackHandler[paths.size()];
     CreateCallbackHandler[] createCbList = null;
     boolean[] needSet = new boolean[paths.size()];
@@ -1106,69 +1105,6 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
     _zkClient.unsubscribeChildChanges(path, childListener);
   }
 
-  // simple test
-  public static void main(String[] args) {
-    ZkClient zkclient = new ZkClient("localhost:2191");
-    zkclient.setZkSerializer(new ZNRecordSerializer());
-    ZkBaseDataAccessor<ZNRecord> accessor = new ZkBaseDataAccessor<ZNRecord>(zkclient);
-
-    // test async create
-    List<String> createPaths = Arrays.asList("/test/child1/child1", "/test/child2/child2");
-    List<ZNRecord> createRecords = Arrays.asList(new ZNRecord("child1"), new ZNRecord("child2"));
-
-    boolean[] needCreate = new boolean[createPaths.size()];
-    Arrays.fill(needCreate, true);
-    List<List<String>> pathsCreated =
-        new ArrayList<List<String>>(Collections.<List<String>> nCopies(createPaths.size(), null));
-    accessor.create(createPaths, createRecords, needCreate, pathsCreated, AccessOption.PERSISTENT);
-    System.out.println("pathsCreated: " + pathsCreated);
-
-    // test async set
-    List<String> setPaths = Arrays.asList("/test/setChild1/setChild1", "/test/setChild2/setChild2");
-    List<ZNRecord> setRecords = Arrays.asList(new ZNRecord("setChild1"), new ZNRecord("setChild2"));
-
-    pathsCreated =
-        new ArrayList<List<String>>(Collections.<List<String>> nCopies(setPaths.size(), null));
-    boolean[] success =
-        accessor.set(setPaths, setRecords, pathsCreated, null, AccessOption.PERSISTENT);
-    System.out.println("pathsCreated: " + pathsCreated);
-    System.out.println("setSuccess: " + Arrays.toString(success));
-
-    // test async update
-    List<String> updatePaths =
-        Arrays.asList("/test/updateChild1/updateChild1", "/test/setChild2/setChild2");
-    class TestUpdater implements DataUpdater<ZNRecord> {
-      final ZNRecord _newData;
-
-      public TestUpdater(ZNRecord newData) {
-        _newData = newData;
-      }
-
-      @Override
-      public ZNRecord update(ZNRecord currentData) {
-        return _newData;
-
-      }
-    }
-    List<DataUpdater<ZNRecord>> updaters =
-        Arrays.asList((DataUpdater<ZNRecord>) new TestUpdater(new ZNRecord("updateChild1")),
-            (DataUpdater<ZNRecord>) new TestUpdater(new ZNRecord("updateChild2")));
-
-    pathsCreated =
-        new ArrayList<List<String>>(Collections.<List<String>> nCopies(updatePaths.size(), null));
-
-    List<ZNRecord> updateRecords =
-        accessor.update(updatePaths, updaters, pathsCreated, null, AccessOption.PERSISTENT);
-    for (int i = 0; i < updatePaths.size(); i++) {
-      success[i] = updateRecords.get(i) != null;
-    }
-    System.out.println("pathsCreated: " + pathsCreated);
-    System.out.println("updateSuccess: " + Arrays.toString(success));
-
-    System.out.println("CLOSING");
-    zkclient.close();
-  }
-
   /**
    * Reset
    */

http://git-wip-us.apache.org/repos/asf/helix/blob/7bb55742/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java
index 73cd2ae..67bf46e 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java
@@ -38,15 +38,17 @@ import org.apache.helix.AccessOption;
 import org.apache.helix.HelixException;
 import org.apache.helix.manager.zk.ZkAsyncCallbacks.CreateCallbackHandler;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor.RetCode;
+import org.apache.helix.manager.zk.client.HelixZkClient;
+import org.apache.helix.manager.zk.client.SharedZkClientFactory;
 import org.apache.helix.store.HelixPropertyListener;
 import org.apache.helix.store.HelixPropertyStore;
 import org.apache.helix.store.zk.ZNode;
 import org.apache.helix.util.PathUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.zookeeper.KeeperException.Code;
 import org.apache.zookeeper.data.Stat;
 import org.apache.zookeeper.server.DataTree;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class ZkCacheBaseDataAccessor<T> implements HelixPropertyStore<T> {
   private static final Logger LOG = LoggerFactory.getLogger(ZkCacheBaseDataAccessor.class);
@@ -67,7 +69,7 @@ public class ZkCacheBaseDataAccessor<T> implements HelixPropertyStore<T> {
   private final ReentrantLock _eventLock = new ReentrantLock();
   private ZkCacheEventThread _eventThread;
 
-  private ZkClient _zkclient = null;
+  private HelixZkClient _zkclient = null;
 
   public ZkCacheBaseDataAccessor(ZkBaseDataAccessor<T> baseAccessor, List<String> wtCachePaths) {
     this(baseAccessor, null, wtCachePaths, null);
@@ -109,13 +111,14 @@ public class ZkCacheBaseDataAccessor<T> implements HelixPropertyStore<T> {
 
   public ZkCacheBaseDataAccessor(String zkAddress, ZkSerializer serializer, String chrootPath,
       List<String> wtCachePaths, List<String> zkCachePaths, String monitorType, String monitorkey) {
-    ZkClient.Builder zkClientBuilder = new ZkClient.Builder();
-    zkClientBuilder.setZkServer(zkAddress).setSessionTimeout(ZkClient.DEFAULT_SESSION_TIMEOUT)
-        .setConnectionTimeout(ZkClient.DEFAULT_CONNECTION_TIMEOUT).setZkSerializer(serializer)
-        .setMonitorType(monitorType).setMonitorKey(monitorkey);
-    _zkclient = zkClientBuilder.build();
-
-    _zkclient.waitUntilConnected(ZkClient.DEFAULT_CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS);
+    HelixZkClient.ZkClientConfig clientConfig = new HelixZkClient.ZkClientConfig();
+    clientConfig.setZkSerializer(serializer)
+        .setMonitorType(monitorType)
+        .setMonitorKey(monitorkey);
+    _zkclient = SharedZkClientFactory.getInstance()
+        .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddress), clientConfig);
+
+    _zkclient.waitUntilConnected(HelixZkClient.DEFAULT_CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS);
     _baseAccessor = new ZkBaseDataAccessor<>(_zkclient);
 
     if (chrootPath == null || chrootPath.equals("/")) {
@@ -196,8 +199,8 @@ public class ZkCacheBaseDataAccessor<T> implements HelixPropertyStore<T> {
         if (cache == null && path.startsWith(cachePath)) {
           cache = _cacheMap.get(cachePath);
         } else if (cache != null && cache != _cacheMap.get(cachePath)) {
-          throw new IllegalArgumentException("Couldn't do cross-cache async operations. paths: "
-              + paths);
+          throw new IllegalArgumentException(
+              "Couldn't do cross-cache async operations. paths: " + paths);
         }
       }
     }
@@ -368,9 +371,8 @@ public class ZkCacheBaseDataAccessor<T> implements HelixPropertyStore<T> {
         // if cache miss, fall back to zk and update cache
         try {
           cache.lockWrite();
-          record =
-              _baseAccessor
-                  .get(serverPath, stat, options | AccessOption.THROW_EXCEPTION_IFNOTEXIST);
+          record = _baseAccessor
+              .get(serverPath, stat, options | AccessOption.THROW_EXCEPTION_IFNOTEXIST);
           cache.update(serverPath, record, stat);
         } catch (ZkNoNodeException e) {
           if (AccessOption.isThrowExceptionIfNotExist(options)) {
@@ -433,7 +435,7 @@ public class ZkCacheBaseDataAccessor<T> implements HelixPropertyStore<T> {
         boolean[] needCreate = new boolean[size];
         Arrays.fill(needCreate, true);
         List<List<String>> pathsCreatedList =
-            new ArrayList<List<String>>(Collections.<List<String>> nCopies(size, null));
+            new ArrayList<List<String>>(Collections.<List<String>>nCopies(size, null));
         CreateCallbackHandler[] createCbList =
             _baseAccessor.create(serverPaths, records, needCreate, pathsCreatedList, options);
 
@@ -467,7 +469,7 @@ public class ZkCacheBaseDataAccessor<T> implements HelixPropertyStore<T> {
         cache.lockWrite();
         List<Stat> setStats = new ArrayList<Stat>();
         List<List<String>> pathsCreatedList =
-            new ArrayList<List<String>>(Collections.<List<String>> nCopies(size, null));
+            new ArrayList<List<String>>(Collections.<List<String>>nCopies(size, null));
         boolean[] success =
             _baseAccessor.set(serverPaths, records, pathsCreatedList, setStats, options);
 
@@ -498,7 +500,7 @@ public class ZkCacheBaseDataAccessor<T> implements HelixPropertyStore<T> {
         List<Stat> setStats = new ArrayList<Stat>();
         boolean[] success = new boolean[size];
         List<List<String>> pathsCreatedList =
-            new ArrayList<List<String>>(Collections.<List<String>> nCopies(size, null));
+            new ArrayList<List<String>>(Collections.<List<String>>nCopies(size, null));
         List<T> updateData =
             _baseAccessor.update(serverPaths, updaters, pathsCreatedList, setStats, options);
 
@@ -577,8 +579,8 @@ public class ZkCacheBaseDataAccessor<T> implements HelixPropertyStore<T> {
     final int size = paths.size();
     List<String> serverPaths = prependChroot(paths);
 
-    List<T> records = new ArrayList<T>(Collections.<T> nCopies(size, null));
-    List<Stat> readStats = new ArrayList<Stat>(Collections.<Stat> nCopies(size, null));
+    List<T> records = new ArrayList<T>(Collections.<T>nCopies(size, null));
+    List<Stat> readStats = new ArrayList<Stat>(Collections.<Stat>nCopies(size, null));
 
     boolean needRead = false;
     boolean needReads[] = new boolean[size]; // init to false
@@ -647,7 +649,7 @@ public class ZkCacheBaseDataAccessor<T> implements HelixPropertyStore<T> {
       // System.out.println("zk-cache");
       ZNode znode = cache.get(serverParentPath);
 
-      if (znode != null && znode.getChildSet() != Collections.<String> emptySet()) {
+      if (znode != null && znode.getChildSet() != Collections.<String>emptySet()) {
         // System.out.println("zk-cache-hit: " + parentPath);
         List<String> childNames = new ArrayList<String>(znode.getChildSet());
         Collections.sort(childNames);
@@ -689,8 +691,8 @@ public class ZkCacheBaseDataAccessor<T> implements HelixPropertyStore<T> {
   }
 
   @Override
-  public List<T> getChildren(String parentPath, List<Stat> stats, int options,
-      int retryCount, int retryInterval) throws HelixException {
+  public List<T> getChildren(String parentPath, List<Stat> stats, int options, int retryCount,
+      int retryInterval) throws HelixException {
     return getChildren(parentPath, stats, options);
   }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/7bb55742/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java
index 8762585..89676db 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java
@@ -24,17 +24,36 @@ import org.I0Itec.zkclient.ZkConnection;
 import org.I0Itec.zkclient.serialize.SerializableSerializer;
 import org.I0Itec.zkclient.serialize.ZkSerializer;
 import org.apache.helix.HelixException;
+import org.apache.helix.manager.zk.client.HelixZkClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 /**
- * This is a wrapper of {@link org.apache.helix.manager.zk.zookeeper.ZkClient},
+ * Raw ZkClient that wraps {@link org.apache.helix.manager.zk.zookeeper.ZkClient},
  * with additional constructors and builder.
  *
- * // TODO: we will need to merge two ZkClient into just one class.
+ * Note that, instead of directly constructing a raw ZkClient, applications should always use
+ * HelixZkClientFactory to build shared or dedicated HelixZkClient instances.
+ * Only constructing a raw ZkClient when advanced usage is required.
+ * For example, application need to access/manage ZkConnection directly.
+ *
+ * Both SharedZKClient and DedicatedZkClient are built based on the raw ZkClient. As shown below.
+ *                ----------------------------
+ *               |                            |
+ *     ---------------------                  |
+ *    |                     |                 | *implements
+ *  SharedZkClient  DedicatedZkClient           ----> HelixZkClient Interface
+ *    |                     |                 |
+ *     ---------------------                  |
+ *               |                            |
+ *           Raw ZkClient (this class)--------
+ *               |
+ *         Native ZkClient
+ *
+ * TODO Completely replace usage of the raw ZkClient within helix-core. Instead, using HelixZkClient. --JJ
  */
-public class ZkClient extends org.apache.helix.manager.zk.zookeeper.ZkClient {
+
+public class ZkClient extends org.apache.helix.manager.zk.zookeeper.ZkClient implements HelixZkClient {
   private static Logger LOG = LoggerFactory.getLogger(ZkClient.class);
 
   public static final int DEFAULT_OPERATION_TIMEOUT = Integer.MAX_VALUE;

http://git-wip-us.apache.org/repos/asf/helix/blob/7bb55742/helix-core/src/main/java/org/apache/helix/manager/zk/client/DedicatedZkClientFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/client/DedicatedZkClientFactory.java b/helix-core/src/main/java/org/apache/helix/manager/zk/client/DedicatedZkClientFactory.java
new file mode 100644
index 0000000..edeb978
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/client/DedicatedZkClientFactory.java
@@ -0,0 +1,35 @@
+package org.apache.helix.manager.zk.client;
+
+import org.apache.helix.manager.zk.ZkClient;
+
+/**
+ * Singleton factory that build dedicated clients using the raw ZkClient.
+ */
+public class DedicatedZkClientFactory extends HelixZkClientFactory {
+
+  protected DedicatedZkClientFactory() {}
+
+  private static class SingletonHelper{
+    private static final DedicatedZkClientFactory INSTANCE = new DedicatedZkClientFactory();
+  }
+
+  public static DedicatedZkClientFactory getInstance(){
+    return SingletonHelper.INSTANCE;
+  }
+
+  /**
+   * Build a Dedicated ZkClient based on connection config and client config
+   *
+   * @param connectionConfig
+   * @param clientConfig
+   * @return
+   */
+  @Override
+  public HelixZkClient buildZkClient(HelixZkClient.ZkConnectionConfig connectionConfig,
+      HelixZkClient.ZkClientConfig clientConfig) {
+    return new ZkClient(createZkConnection(connectionConfig),
+        (int) clientConfig.getConnectInitTimeout(), clientConfig.getOperationRetryTimeout(),
+        clientConfig.getZkSerializer(), clientConfig.getMonitorType(), clientConfig.getMonitorKey(),
+        clientConfig.getMonitorInstanceName(), clientConfig.isMonitorRootPathOnly());
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/7bb55742/helix-core/src/main/java/org/apache/helix/manager/zk/client/HelixZkClient.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/client/HelixZkClient.java b/helix-core/src/main/java/org/apache/helix/manager/zk/client/HelixZkClient.java
new file mode 100644
index 0000000..65e0027
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/client/HelixZkClient.java
@@ -0,0 +1,306 @@
+package org.apache.helix.manager.zk.client;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.I0Itec.zkclient.DataUpdater;
+import org.I0Itec.zkclient.IZkChildListener;
+import org.I0Itec.zkclient.IZkDataListener;
+import org.I0Itec.zkclient.IZkStateListener;
+import org.I0Itec.zkclient.serialize.SerializableSerializer;
+import org.I0Itec.zkclient.serialize.ZkSerializer;
+import org.apache.helix.manager.zk.BasicZkSerializer;
+import org.apache.helix.manager.zk.PathBasedZkSerializer;
+import org.apache.helix.manager.zk.ZkAsyncCallbacks;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.Op;
+import org.apache.zookeeper.OpResult;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+
+/**
+ * Helix ZkClient interface.
+ */
+public interface HelixZkClient {
+  int DEFAULT_OPERATION_TIMEOUT = Integer.MAX_VALUE;
+  int DEFAULT_CONNECTION_TIMEOUT = 60 * 1000;
+  int DEFAULT_SESSION_TIMEOUT = 30 * 1000;
+
+  // listener subscription
+  List<String> subscribeChildChanges(String path, IZkChildListener listener);
+
+  void unsubscribeChildChanges(String path, IZkChildListener listener);
+
+  void subscribeDataChanges(String path, IZkDataListener listener);
+
+  void unsubscribeDataChanges(String path, IZkDataListener listener);
+
+  void subscribeStateChanges(final IZkStateListener listener);
+
+  void unsubscribeStateChanges(IZkStateListener listener);
+
+  void unsubscribeAll();
+
+  // data access
+  void createPersistent(String path);
+
+  void createPersistent(String path, boolean createParents);
+
+  void createPersistent(String path, boolean createParents, List<ACL> acl);
+
+  void createPersistent(String path, Object data);
+
+  void createPersistent(String path, Object data, List<ACL> acl);
+
+  String createPersistentSequential(String path, Object data);
+
+  String createPersistentSequential(String path, Object data, List<ACL> acl);
+
+  void createEphemeral(final String path);
+
+  void createEphemeral(final String path, final List<ACL> acl);
+
+  String create(final String path, Object data, final CreateMode mode);
+
+  String create(final String path, Object datat, final List<ACL> acl, final CreateMode mode);
+
+  void createEphemeral(final String path, final Object data);
+
+  void createEphemeral(final String path, final Object data, final List<ACL> acl);
+
+  String createEphemeralSequential(final String path, final Object data);
+
+  String createEphemeralSequential(final String path, final Object data, final List<ACL> acl);
+
+  List<String> getChildren(String path);
+
+  int countChildren(String path);
+
+  boolean exists(final String path);
+
+  Stat getStat(final String path);
+
+  boolean waitUntilExists(String path, TimeUnit timeUnit, long time);
+
+  void deleteRecursively(String path);
+
+  boolean delete(final String path);
+
+  <T extends Object> T readData(String path);
+
+  <T extends Object> T readData(String path, boolean returnNullIfPathNotExists);
+
+  <T extends Object> T readData(String path, Stat stat);
+
+  <T extends Object> T readData(final String path, final Stat stat, final boolean watch);
+
+  <T extends Object> T readDataAndStat(String path, Stat stat, boolean returnNullIfPathNotExists);
+
+  void writeData(String path, Object object);
+
+  <T extends Object> void updateDataSerialized(String path, DataUpdater<T> updater);
+
+  void writeData(final String path, Object datat, final int expectedVersion);
+
+  Stat writeDataReturnStat(final String path, Object datat, final int expectedVersion);
+
+  Stat writeDataGetStat(final String path, Object datat, final int expectedVersion);
+
+  void asyncCreate(final String path, Object datat, final CreateMode mode,
+      final ZkAsyncCallbacks.CreateCallbackHandler cb);
+
+  void asyncSetData(final String path, Object datat, final int version,
+      final ZkAsyncCallbacks.SetDataCallbackHandler cb);
+
+  void asyncGetData(final String path, final ZkAsyncCallbacks.GetDataCallbackHandler cb);
+
+  void asyncExists(final String path, final ZkAsyncCallbacks.ExistsCallbackHandler cb);
+
+  void asyncDelete(final String path, final ZkAsyncCallbacks.DeleteCallbackHandler cb);
+
+  void watchForData(final String path);
+
+  List<String> watchForChilds(final String path);
+
+  long getCreationTime(String path);
+
+  List<OpResult> multi(final Iterable<Op> ops);
+
+  // ZK state control
+  boolean waitUntilConnected(long time, TimeUnit timeUnit);
+
+  String getServers();
+
+  long getSessionId();
+
+  void close();
+
+  boolean isClosed();
+
+  // other
+  byte[] serialize(Object data, String path);
+
+  <T extends Object> T deserialize(byte[] data, String path);
+
+  void setZkSerializer(ZkSerializer zkSerializer);
+
+  void setZkSerializer(PathBasedZkSerializer zkSerializer);
+
+  PathBasedZkSerializer getZkSerializer();
+
+  /**
+   * Configuration for creating a new ZkConnection.
+   */
+  class ZkConnectionConfig {
+    // Connection configs
+    private final String _zkServers;
+    private int _sessionTimeout = HelixZkClient.DEFAULT_SESSION_TIMEOUT;
+
+    public ZkConnectionConfig(String zkServers) {
+      _zkServers = zkServers;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (obj == this) {
+        return true;
+      }
+      if (!(obj instanceof ZkConnectionConfig)) {
+        return false;
+      }
+      ZkConnectionConfig configObj = (ZkConnectionConfig) obj;
+      return (_zkServers == null && configObj._zkServers == null ||
+          _zkServers != null && _zkServers.equals(configObj._zkServers)) &&
+          _sessionTimeout == configObj._sessionTimeout;
+    }
+
+    @Override
+    public int hashCode() {
+      return _sessionTimeout * 31 + _zkServers.hashCode();
+    }
+
+    @Override
+    public String toString() {
+      return (_zkServers + "_" + _sessionTimeout).replaceAll("[\\W]", "_");
+    }
+
+    public ZkConnectionConfig setSessionTimeout(Integer sessionTimeout) {
+      this._sessionTimeout = sessionTimeout;
+      return this;
+    }
+
+    public String getZkServers() {
+      return _zkServers;
+    }
+
+    public int getSessionTimeout() {
+      return _sessionTimeout;
+    }
+  }
+
+  /**
+   * Configuration for creating a new ZkClient with serializer and monitor.
+   */
+  class ZkClientConfig {
+    // For client to init the connection
+    private long _connectInitTimeout = HelixZkClient.DEFAULT_CONNECTION_TIMEOUT;
+
+    // Data access configs
+    private long _operationRetryTimeout = HelixZkClient.DEFAULT_OPERATION_TIMEOUT;
+
+    // Others
+    private PathBasedZkSerializer _zkSerializer;
+
+    // Monitoring
+    private String _monitorType;
+    private String _monitorKey;
+    private String _monitorInstanceName = null;
+    private boolean _monitorRootPathOnly = true;
+
+    public ZkClientConfig setZkSerializer(PathBasedZkSerializer zkSerializer) {
+      this._zkSerializer = zkSerializer;
+      return this;
+    }
+
+    public ZkClientConfig setZkSerializer(ZkSerializer zkSerializer) {
+      this._zkSerializer = new BasicZkSerializer(zkSerializer);
+      return this;
+    }
+
+    /**
+     * Used as part of the MBean ObjectName. This item is required for enabling monitoring.
+     *
+     * @param monitorType
+     */
+    public ZkClientConfig setMonitorType(String monitorType) {
+      this._monitorType = monitorType;
+      return this;
+    }
+
+    /**
+     * Used as part of the MBean ObjectName. This item is required for enabling monitoring.
+     *
+     * @param monitorKey
+     */
+    public ZkClientConfig setMonitorKey(String monitorKey) {
+      this._monitorKey = monitorKey;
+      return this;
+    }
+
+    /**
+     * Used as part of the MBean ObjectName. This item is optional.
+     *
+     * @param instanceName
+     */
+    public ZkClientConfig setMonitorInstanceName(String instanceName) {
+      this._monitorInstanceName = instanceName;
+      return this;
+    }
+
+    public ZkClientConfig setMonitorRootPathOnly(Boolean monitorRootPathOnly) {
+      this._monitorRootPathOnly = monitorRootPathOnly;
+      return this;
+    }
+
+    public ZkClientConfig setOperationRetryTimeout(Long operationRetryTimeout) {
+      this._operationRetryTimeout = operationRetryTimeout;
+      return this;
+    }
+
+    public ZkClientConfig setConnectInitTimeout(long _connectInitTimeout) {
+      this._connectInitTimeout = _connectInitTimeout;
+      return this;
+    }
+
+    public PathBasedZkSerializer getZkSerializer() {
+      if (_zkSerializer == null) {
+        _zkSerializer = new BasicZkSerializer(new SerializableSerializer());
+      }
+      return _zkSerializer;
+    }
+
+    public long getOperationRetryTimeout() {
+      return _operationRetryTimeout;
+    }
+
+    public String getMonitorType() {
+      return _monitorType;
+    }
+
+    public String getMonitorKey() {
+      return _monitorKey;
+    }
+
+    public String getMonitorInstanceName() {
+      return _monitorInstanceName;
+    }
+
+    public boolean isMonitorRootPathOnly() {
+      return _monitorRootPathOnly;
+    }
+
+    public long getConnectInitTimeout() {
+      return _connectInitTimeout;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/7bb55742/helix-core/src/main/java/org/apache/helix/manager/zk/client/HelixZkClientFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/client/HelixZkClientFactory.java b/helix-core/src/main/java/org/apache/helix/manager/zk/client/HelixZkClientFactory.java
new file mode 100644
index 0000000..9d10cd3
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/client/HelixZkClientFactory.java
@@ -0,0 +1,46 @@
+package org.apache.helix.manager.zk.client;
+
+import org.I0Itec.zkclient.IZkConnection;
+import org.I0Itec.zkclient.ZkConnection;
+import org.apache.helix.HelixException;
+
+/**
+ * Abstract class of the ZkClient factory.
+ */
+abstract class HelixZkClientFactory {
+
+  /**
+   * Build a ZkClient using specified connection config and client config
+   *
+   * @param connectionConfig
+   * @param clientConfig
+   * @return HelixZkClient
+   */
+  public abstract HelixZkClient buildZkClient(HelixZkClient.ZkConnectionConfig connectionConfig,
+      HelixZkClient.ZkClientConfig clientConfig);
+
+  /**
+   * Build a ZkClient using specified connection config and default client config
+   *
+   * @param connectionConfig
+   * @return HelixZkClient
+   */
+  public HelixZkClient buildZkClient(HelixZkClient.ZkConnectionConfig connectionConfig) {
+    return buildZkClient(connectionConfig, new HelixZkClient.ZkClientConfig());
+  }
+
+  /**
+   * Construct a new ZkConnection instance based on connection configuration.
+   * Note that the connection is not really made until someone calls zkConnection.connect().
+   * @param connectionConfig
+   * @return
+   */
+  protected IZkConnection createZkConnection(HelixZkClient.ZkConnectionConfig connectionConfig) {
+    if (connectionConfig.getZkServers() == null) {
+      throw new HelixException(
+          "Failed to build ZkClient since no connection or ZK server address is specified.");
+    } else {
+      return new ZkConnection(connectionConfig.getZkServers(), connectionConfig.getSessionTimeout());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/7bb55742/helix-core/src/main/java/org/apache/helix/manager/zk/client/SharedZkClient.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/client/SharedZkClient.java b/helix-core/src/main/java/org/apache/helix/manager/zk/client/SharedZkClient.java
new file mode 100644
index 0000000..242dea0
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/client/SharedZkClient.java
@@ -0,0 +1,92 @@
+package org.apache.helix.manager.zk.client;
+
+import java.util.List;
+
+import org.I0Itec.zkclient.IZkConnection;
+import org.I0Itec.zkclient.ZkConnection;
+import org.apache.helix.HelixException;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.data.ACL;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * ZkClient that uses shared ZkConnection.
+ * A SharedZkClient won't manipulate the shared ZkConnection directly.
+ */
+class SharedZkClient extends org.apache.helix.manager.zk.ZkClient implements HelixZkClient {
+  private static Logger LOG = LoggerFactory.getLogger(SharedZkClient.class);
+  /*
+   * Since we cannot really disconnect the ZkConnection, we need a dummy ZkConnection placeholder.
+   * This is to ensure connection field is never null even the shared ZkClient instance is closed so as to avoid NPE.
+   */
+  private final static ZkConnection IDLE_CONNECTION = new ZkConnection("Dummy_ZkServers");
+  private final OnCloseCallback _onCloseCallback;
+  private final ZkConnectionManager _connectionManager;
+
+  interface OnCloseCallback {
+    /**
+     * Triggered after the ZkClient is closed.
+     */
+    void onClose();
+  }
+
+  /**
+   * Construct a shared ZkClient that uses a shared ZkConnection.
+   *
+   * @param connectionManager     The manager of the shared ZkConnection.
+   * @param clientConfig          ZkClientConfig details to create the shared ZkClient.
+   * @param callback              Clean up logic when the shared ZkClient is closed.
+   */
+  protected SharedZkClient(ZkConnectionManager connectionManager, ZkClientConfig clientConfig,
+      OnCloseCallback callback) {
+    super(connectionManager.getConnection(), 0, clientConfig.getOperationRetryTimeout(),
+        clientConfig.getZkSerializer(), clientConfig.getMonitorType(), clientConfig.getMonitorKey(),
+        clientConfig.getMonitorInstanceName(), clientConfig.isMonitorRootPathOnly());
+    _connectionManager = connectionManager;
+    // Register to the base dedicated ZkClient
+    _connectionManager.registerWatcher(this);
+    _onCloseCallback = callback;
+  }
+
+  @Override
+  public void close() {
+    super.close();
+    if (isClosed()) {
+      // Note that if register is not done while constructing, these private fields may not be init yet.
+      if (_connectionManager != null) {
+        _connectionManager.unregisterWatcher(this);
+      }
+      if (_onCloseCallback != null) {
+        _onCloseCallback.onClose();
+      }
+    }
+  }
+
+  @Override
+  public IZkConnection getConnection() {
+    if (isClosed()) {
+      return IDLE_CONNECTION;
+    }
+    return super.getConnection();
+  }
+
+  /**
+   * Since ZkConnection session is shared in this ZkClient, do not create ephemeral node using a SharedZKClient.
+   */
+  @Override
+  public String create(final String path, Object datat, final List<ACL> acl,
+      final CreateMode mode) {
+    if (mode.isEphemeral()) {
+      throw new HelixException(
+          "Create ephemeral nodes using a " + SharedZkClient.class.getSimpleName()
+              + " ZkClient is not supported.");
+    }
+    return super.create(path, datat, acl, mode);
+  }
+
+  @Override
+  protected boolean isManagingZkConnection() {
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/7bb55742/helix-core/src/main/java/org/apache/helix/manager/zk/client/SharedZkClientFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/client/SharedZkClientFactory.java b/helix-core/src/main/java/org/apache/helix/manager/zk/client/SharedZkClientFactory.java
new file mode 100644
index 0000000..ed4b5de
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/client/SharedZkClientFactory.java
@@ -0,0 +1,87 @@
+package org.apache.helix.manager.zk.client;
+
+import java.util.HashMap;
+
+import org.apache.helix.HelixException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Singleton factory that build shared ZkClient which use a shared ZkConnection.
+ */
+public class SharedZkClientFactory extends HelixZkClientFactory {
+  private static Logger LOG = LoggerFactory.getLogger(SharedZkClient.class);
+  // The connection pool to track all created connections.
+  private final HashMap<HelixZkClient.ZkConnectionConfig, ZkConnectionManager>
+      _connectionManagerPool = new HashMap<>();
+
+  protected SharedZkClientFactory() {}
+
+  private static class SingletonHelper {
+    private static final SharedZkClientFactory INSTANCE = new SharedZkClientFactory();
+  }
+
+  public static SharedZkClientFactory getInstance() {
+    return SingletonHelper.INSTANCE;
+  }
+
+  /**
+   * Build a Shared ZkClient that uses sharing ZkConnection that is created based on the specified connection config.
+   *
+   * @param connectionConfig The connection configuration that is used to search for a shared connection. Or create new connection if necessary.
+   * @param clientConfig
+   * @return Shared ZkClient
+   */
+  @Override
+  public HelixZkClient buildZkClient(HelixZkClient.ZkConnectionConfig connectionConfig,
+      HelixZkClient.ZkClientConfig clientConfig) {
+    synchronized (_connectionManagerPool) {
+      final ZkConnectionManager zkConnectionManager =
+          getOrCreateZkConnectionNamanger(connectionConfig, clientConfig.getConnectInitTimeout());
+      if (zkConnectionManager == null) {
+        throw new HelixException("Failed to create a connection manager in the pool to share.");
+      }
+      LOG.info("Sharing ZkConnection {} to a new SharedZkClient.", connectionConfig.toString());
+      return new SharedZkClient(zkConnectionManager, clientConfig,
+          new SharedZkClient.OnCloseCallback() {
+            @Override
+            public void onClose() {
+              cleanupConnectionManager(zkConnectionManager);
+            }
+          });
+    }
+  }
+
+  private ZkConnectionManager getOrCreateZkConnectionNamanger(
+      HelixZkClient.ZkConnectionConfig connectionConfig, long connectInitTimeout) {
+    ZkConnectionManager connectionManager = _connectionManagerPool.get(connectionConfig);
+    if (connectionManager == null || connectionManager.isClosed()) {
+      connectionManager = new ZkConnectionManager(createZkConnection(connectionConfig), connectInitTimeout,
+          connectionConfig.toString());
+      _connectionManagerPool.put(connectionConfig, connectionManager);
+    }
+    return connectionManager;
+  }
+
+  // Close the ZkConnectionManager if no other shared client is referring to it.
+  // Note the close operation of connection manager needs to be synchronized with the pool operation
+  // to avoid race condition.
+  private void cleanupConnectionManager(ZkConnectionManager zkConnectionManager) {
+    synchronized (_connectionManagerPool) {
+      zkConnectionManager.close(true);
+    }
+  }
+
+  // For test only
+  protected int getActiveConnectionCount() {
+    int count = 0;
+    synchronized (_connectionManagerPool) {
+      for (ZkConnectionManager manager : _connectionManagerPool.values()) {
+        if (!manager.isClosed()) {
+          count++;
+        }
+      }
+    }
+    return count;
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/7bb55742/helix-core/src/main/java/org/apache/helix/manager/zk/client/ZkConnectionManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/client/ZkConnectionManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/client/ZkConnectionManager.java
new file mode 100644
index 0000000..0a9ddc1
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/client/ZkConnectionManager.java
@@ -0,0 +1,120 @@
+package org.apache.helix.manager.zk.client;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.I0Itec.zkclient.IZkConnection;
+import org.I0Itec.zkclient.serialize.SerializableSerializer;
+import org.apache.helix.HelixException;
+import org.apache.helix.manager.zk.BasicZkSerializer;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A ZkConnection manager that maintain connection status and allows additional watchers to be registered.
+ * It will forward events to those watchers.
+ *
+ * TODO Separate connection management logic from the raw ZkClient class.
+ * So this manager is a peer to the ZkClient. Connection Manager for maintaining the connection and
+ * ZkClient to handle user request.
+ * After this is done, Dedicated ZkClient hires one manager for it's connection.
+ * While multiple Shared ZkClients can use single connection manager if possible.
+ */
+class ZkConnectionManager extends org.apache.helix.manager.zk.ZkClient {
+  private static Logger LOG = LoggerFactory.getLogger(ZkConnectionManager.class);
+  // Client type that is used in monitor, and metrics.
+  private final static String MONITOR_TYPE = "ZkConnectionManager";
+  private final String _monitorKey;
+  // Set of all registered watchers
+  private final Set<Watcher> _sharedWatchers = new HashSet<>();
+
+  /**
+   * Construct and init a ZkConnection Manager.
+   *
+   * @param zkConnection
+   * @param connectionTimeout
+   */
+  protected ZkConnectionManager(IZkConnection zkConnection, long connectionTimeout,
+      String monitorKey) {
+    super(zkConnection, (int) connectionTimeout, HelixZkClient.DEFAULT_OPERATION_TIMEOUT,
+        new BasicZkSerializer(new SerializableSerializer()), MONITOR_TYPE, monitorKey, null, true);
+    _monitorKey = monitorKey;
+    LOG.info("ZkConnection {} was created for sharing.", _monitorKey);
+  }
+
+  /**
+   * Register event watcher.
+   *
+   * @param watcher
+   * @return true if the watcher is newly added. false if it is already in record.
+   */
+  protected synchronized boolean registerWatcher(Watcher watcher) {
+    if (isClosed()) {
+      throw new HelixException("Cannot add watcher to a closed client.");
+    }
+    return _sharedWatchers.add(watcher);
+  }
+
+  /**
+   * Unregister the event watcher.
+   *
+   * @param watcher
+   * @return number of the remaining event watchers
+   */
+  protected synchronized int unregisterWatcher(Watcher watcher) {
+    _sharedWatchers.remove(watcher);
+    return _sharedWatchers.size();
+  }
+
+  @Override
+  public void process(final WatchedEvent event) {
+    super.process(event);
+    forwardingEvent(event);
+  }
+
+  private synchronized void forwardingEvent(final WatchedEvent event) {
+    // note that process (then forwardingEvent) could be triggered during construction, when sharedWatchers is still null.
+    if (_sharedWatchers == null || _sharedWatchers.isEmpty()) {
+      return;
+    }
+    // forward event to all the watchers' event queue
+    for (final Watcher watcher : _sharedWatchers) {
+      watcher.process(event);
+    }
+  }
+
+  @Override
+  public void close() {
+    // Enforce closing, if any watcher exists, throw Exception.
+    close(false);
+  }
+
+  protected synchronized void close(boolean skipIfWatched) {
+    cleanupInactiveWatchers();
+    if (_sharedWatchers.size() > 0) {
+      if (skipIfWatched) {
+        LOG.debug("Skip closing ZkConnection due to existing watchers. Watcher count {}.",
+            _sharedWatchers.size());
+        return;
+      } else {
+        throw new HelixException(
+            "Cannot close the connection when there are still shared watchers listen on the event.");
+      }
+    }
+    super.close();
+    LOG.info("ZkConnection {} was closed.", _monitorKey);
+  }
+
+  private void cleanupInactiveWatchers() {
+    Set<Watcher> closedWatchers = new HashSet<>();
+    for (Watcher watcher : _sharedWatchers) {
+      // TODO ideally, we shall have a ClosableWatcher interface so as to check accordingly. -- JJ
+      if (watcher instanceof SharedZkClient && ((SharedZkClient) watcher).isClosed()) {
+        closedWatchers.add(watcher);
+      }
+    }
+    _sharedWatchers.removeAll(closedWatchers);
+  }
+}