You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by hu...@apache.org on 2020/04/01 22:47:21 UTC

[helix] 26/49: Add SharedZkClient/InnerSharedZkClient implementation (#796)

This is an automated email from the ASF dual-hosted git repository.

hulee pushed a commit to branch zooscalability
in repository https://gitbox.apache.org/repos/asf/helix.git

commit edda27390a445fca8f9305ddead4adecaa1015ee
Author: kaisun2000 <52...@users.noreply.github.com>
AuthorDate: Thu Feb 27 14:55:41 2020 -0800

    Add SharedZkClient/InnerSharedZkClient implementation (#796)
    
    Refactor the original SharedZkClient to InnerSharedZkClient. Add
    SharedZkClient implementation. The implementation use composition
    pattern. It would check the ZkPath validity and delegate the
    implementation to InnerSharedZkClient. In sum, InnerSharedZkClient
    is shared ZkClient but not realm aware. SharedZkClient is truely
    realm aware ZkClient.
---
 .../helix/manager/zk/client/SharedZkClient.java    |  16 +-
 .../helix/zookeeper/api/client/HelixZkClient.java  |  69 +--
 .../zookeeper/api/client/RealmAwareZkClient.java   |  14 +-
 .../api/factory/RealmAwareZkClientFactory.java     |   1 +
 .../zookeeper/impl/client/SharedZkClient.java      | 523 ++++++++++++++++++---
 .../impl/factory/SharedZkClientFactory.java        | 107 ++++-
 .../impl/client/RealmAwareZkClientTestBase.java    |   8 +-
 .../zookeeper/impl/client/TestSharedZkClient.java  |  47 ++
 8 files changed, 635 insertions(+), 150 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/client/SharedZkClient.java b/helix-core/src/main/java/org/apache/helix/manager/zk/client/SharedZkClient.java
index 52a2c17..be6b54d 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/client/SharedZkClient.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/client/SharedZkClient.java
@@ -19,19 +19,17 @@ package org.apache.helix.manager.zk.client;
  * under the License.
  */
 
+import org.apache.helix.zookeeper.impl.factory.SharedZkClientFactory;
+import org.apache.helix.zookeeper.impl.factory.ZkConnectionManager;
+
+
 /**
  * Deprecated; use SharedZkClient in zookeeper-api instead.
  */
 @Deprecated
-class SharedZkClient extends org.apache.helix.zookeeper.impl.client.SharedZkClient {
-  /**
-   * Construct a shared RealmAwareZkClient that uses a shared ZkConnection.
-   *  @param connectionManager     The manager of the shared ZkConnection.
-   * @param clientConfig          ZkClientConfig details to create the shared RealmAwareZkClient.
-   * @param callback              Clean up logic when the shared RealmAwareZkClient is closed.
-   */
-  protected SharedZkClient(ZkConnectionManager connectionManager, ZkClientConfig clientConfig,
-      org.apache.helix.zookeeper.impl.client.SharedZkClient.OnCloseCallback callback) {
+class SharedZkClient extends SharedZkClientFactory.InnerSharedZkClient {
+  SharedZkClient(ZkConnectionManager connectionManager, ZkClientConfig clientConfig,
+      SharedZkClientFactory.OnCloseCallback callback) {
     super(connectionManager, clientConfig, callback);
   }
 }
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/HelixZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/HelixZkClient.java
index 9a1a69d..03bf000 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/HelixZkClient.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/HelixZkClient.java
@@ -19,9 +19,9 @@ package org.apache.helix.zookeeper.api.client;
  * under the License.
  */
 
+import org.apache.helix.zookeeper.zkclient.ZkClient;
 import org.apache.helix.zookeeper.zkclient.serialize.BasicZkSerializer;
 import org.apache.helix.zookeeper.zkclient.serialize.PathBasedZkSerializer;
-import org.apache.helix.zookeeper.zkclient.serialize.SerializableSerializer;
 import org.apache.helix.zookeeper.zkclient.serialize.ZkSerializer;
 
 
@@ -42,7 +42,7 @@ public interface HelixZkClient extends RealmAwareZkClient {
   class ZkConnectionConfig {
     // Connection configs
     private final String _zkServers;
-    private int _sessionTimeout = HelixZkClient.DEFAULT_SESSION_TIMEOUT;
+    private int _sessionTimeout = DEFAULT_SESSION_TIMEOUT;
 
     public ZkConnectionConfig(String zkServers) {
       _zkServers = zkServers;
@@ -53,10 +53,10 @@ public interface HelixZkClient extends RealmAwareZkClient {
       if (obj == this) {
         return true;
       }
-      if (!(obj instanceof HelixZkClient.ZkConnectionConfig)) {
+      if (!(obj instanceof ZkConnectionConfig)) {
         return false;
       }
-      HelixZkClient.ZkConnectionConfig configObj = (HelixZkClient.ZkConnectionConfig) obj;
+      ZkConnectionConfig configObj = (ZkConnectionConfig) obj;
       return (_zkServers == null && configObj._zkServers == null || _zkServers != null && _zkServers
           .equals(configObj._zkServers)) && _sessionTimeout == configObj._sessionTimeout;
     }
@@ -71,7 +71,7 @@ public interface HelixZkClient extends RealmAwareZkClient {
       return (_zkServers + "_" + _sessionTimeout).replaceAll("[\\W]", "_");
     }
 
-    public HelixZkClient.ZkConnectionConfig setSessionTimeout(Integer sessionTimeout) {
+    public ZkConnectionConfig setSessionTimeout(Integer sessionTimeout) {
       this._sessionTimeout = sessionTimeout;
       return this;
     }
@@ -89,32 +89,16 @@ public interface HelixZkClient extends RealmAwareZkClient {
    * Deprecated - please use RealmAwareZkClient and RealmAwareZkClientConfig instead.
    *
    * Configuration for creating a new HelixZkClient with serializer and monitor.
-   *
-   * TODO: If possible, try to merge with RealmAwareZkClient's RealmAwareZkClientConfig to reduce duplicate logic/code (without breaking backward-compatibility).
-   * Simply making this a subclass of RealmAwareZkClientConfig will break backward-compatiblity.
    */
   @Deprecated
-  class ZkClientConfig {
-    // For client to init the connection
-    private long _connectInitTimeout = DEFAULT_CONNECTION_TIMEOUT;
-
-    // Data access configs
-    private long _operationRetryTimeout = DEFAULT_OPERATION_TIMEOUT;
-
-    // Others
-    private PathBasedZkSerializer _zkSerializer;
-
-    // Monitoring
-    private String _monitorType;
-    private String _monitorKey;
-    private String _monitorInstanceName = null;
-    private boolean _monitorRootPathOnly = true;
-
+  class ZkClientConfig extends RealmAwareZkClientConfig {
+    @Override
     public ZkClientConfig setZkSerializer(PathBasedZkSerializer zkSerializer) {
       this._zkSerializer = zkSerializer;
       return this;
     }
 
+    @Override
     public ZkClientConfig setZkSerializer(ZkSerializer zkSerializer) {
       this._zkSerializer = new BasicZkSerializer(zkSerializer);
       return this;
@@ -125,6 +109,7 @@ public interface HelixZkClient extends RealmAwareZkClient {
      *
      * @param monitorType
      */
+    @Override
     public ZkClientConfig setMonitorType(String monitorType) {
       this._monitorType = monitorType;
       return this;
@@ -135,6 +120,7 @@ public interface HelixZkClient extends RealmAwareZkClient {
      *
      * @param monitorKey
      */
+    @Override
     public ZkClientConfig setMonitorKey(String monitorKey) {
       this._monitorKey = monitorKey;
       return this;
@@ -145,55 +131,28 @@ public interface HelixZkClient extends RealmAwareZkClient {
      *
      * @param instanceName
      */
+    @Override
     public ZkClientConfig setMonitorInstanceName(String instanceName) {
       this._monitorInstanceName = instanceName;
       return this;
     }
 
+    @Override
     public ZkClientConfig setMonitorRootPathOnly(Boolean monitorRootPathOnly) {
       this._monitorRootPathOnly = monitorRootPathOnly;
       return this;
     }
 
+    @Override
     public ZkClientConfig setOperationRetryTimeout(Long operationRetryTimeout) {
       this._operationRetryTimeout = operationRetryTimeout;
       return this;
     }
 
+    @Override
     public ZkClientConfig setConnectInitTimeout(long _connectInitTimeout) {
       this._connectInitTimeout = _connectInitTimeout;
       return this;
     }
-
-    public PathBasedZkSerializer getZkSerializer() {
-      if (_zkSerializer == null) {
-        _zkSerializer = new BasicZkSerializer(new SerializableSerializer());
-      }
-      return _zkSerializer;
-    }
-
-    public long getOperationRetryTimeout() {
-      return _operationRetryTimeout;
-    }
-
-    public String getMonitorType() {
-      return _monitorType;
-    }
-
-    public String getMonitorKey() {
-      return _monitorKey;
-    }
-
-    public String getMonitorInstanceName() {
-      return _monitorInstanceName;
-    }
-
-    public boolean isMonitorRootPathOnly() {
-      return _monitorRootPathOnly;
-    }
-
-    public long getConnectInitTimeout() {
-      return _connectInitTimeout;
-    }
   }
 }
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/RealmAwareZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/RealmAwareZkClient.java
index aa8bf7e..e466d36 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/RealmAwareZkClient.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/RealmAwareZkClient.java
@@ -380,19 +380,19 @@ public interface RealmAwareZkClient {
    */
   class RealmAwareZkClientConfig {
     // For client to init the connection
-    private long _connectInitTimeout = DEFAULT_CONNECTION_TIMEOUT;
+    protected long _connectInitTimeout = DEFAULT_CONNECTION_TIMEOUT;
 
     // Data access configs
-    private long _operationRetryTimeout = DEFAULT_OPERATION_TIMEOUT;
+    protected long _operationRetryTimeout = DEFAULT_OPERATION_TIMEOUT;
 
     // Others
-    private PathBasedZkSerializer _zkSerializer;
+    protected PathBasedZkSerializer _zkSerializer;
 
     // Monitoring
-    private String _monitorType;
-    private String _monitorKey;
-    private String _monitorInstanceName = null;
-    private boolean _monitorRootPathOnly = true;
+    protected String _monitorType;
+    protected String _monitorKey;
+    protected String _monitorInstanceName = null;
+    protected boolean _monitorRootPathOnly = true;
 
     public RealmAwareZkClientConfig setZkSerializer(PathBasedZkSerializer zkSerializer) {
       this._zkSerializer = zkSerializer;
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/factory/RealmAwareZkClientFactory.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/factory/RealmAwareZkClientFactory.java
index 8c1f7a3..cdfa778 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/factory/RealmAwareZkClientFactory.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/factory/RealmAwareZkClientFactory.java
@@ -45,6 +45,7 @@ public interface RealmAwareZkClientFactory {
    * @param metadataStoreRoutingData
    * @return RealmAwareZkClient
    */
+
   // TODO: remove MetadataStoreRoutingData
   default RealmAwareZkClient buildZkClient(
       RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfig,
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/SharedZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/SharedZkClient.java
index 70d58a8..f2d9416 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/SharedZkClient.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/SharedZkClient.java
@@ -20,13 +20,27 @@ package org.apache.helix.zookeeper.impl.client;
  */
 
 import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.concurrent.TimeUnit;
 
+import org.apache.helix.msdcommon.datamodel.MetadataStoreRoutingData;
 import org.apache.helix.zookeeper.api.client.HelixZkClient;
-import org.apache.helix.zookeeper.impl.factory.ZkConnectionManager;
-import org.apache.helix.zookeeper.zkclient.IZkConnection;
-import org.apache.helix.zookeeper.zkclient.ZkConnection;
+import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
+import org.apache.helix.zookeeper.impl.factory.SharedZkClientFactory;
+import org.apache.helix.zookeeper.zkclient.DataUpdater;
+import org.apache.helix.zookeeper.zkclient.IZkChildListener;
+import org.apache.helix.zookeeper.zkclient.IZkDataListener;
+import org.apache.helix.zookeeper.zkclient.callback.ZkAsyncCallbacks;
+import org.apache.helix.zookeeper.zkclient.deprecated.IZkStateListener;
+import org.apache.helix.zookeeper.zkclient.exception.ZkNoNodeException;
+import org.apache.helix.zookeeper.zkclient.serialize.PathBasedZkSerializer;
+import org.apache.helix.zookeeper.zkclient.serialize.ZkSerializer;
 import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.Op;
+import org.apache.zookeeper.OpResult;
+import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -37,79 +51,460 @@ import org.slf4j.LoggerFactory;
  * HelixZkClient that uses shared ZkConnection.
  * A SharedZkClient won't manipulate the shared ZkConnection directly.
  */
-public class SharedZkClient extends ZkClient implements HelixZkClient {
+public class SharedZkClient implements RealmAwareZkClient {
   private static Logger LOG = LoggerFactory.getLogger(SharedZkClient.class);
-  /*
-   * Since we cannot really disconnect the ZkConnection, we need a dummy ZkConnection placeholder.
-   * This is to ensure connection field is never null even the shared RealmAwareZkClient instance is closed so as to avoid NPE.
-   */
-  private final static ZkConnection IDLE_CONNECTION = new ZkConnection("Dummy_ZkServers");
-  private final OnCloseCallback _onCloseCallback;
-  private final ZkConnectionManager _connectionManager;
-
-  public interface OnCloseCallback {
-    /**
-     * Triggered after the RealmAwareZkClient is closed.
-     */
-    void onClose();
-  }
-
-  /**
-   * Construct a shared RealmAwareZkClient that uses a shared ZkConnection.
-   *
-   * @param connectionManager     The manager of the shared ZkConnection.
-   * @param clientConfig          ZkClientConfig details to create the shared RealmAwareZkClient.
-   * @param callback              Clean up logic when the shared RealmAwareZkClient is closed.
-   */
-  public SharedZkClient(ZkConnectionManager connectionManager, ZkClientConfig clientConfig,
-      OnCloseCallback callback) {
-    super(connectionManager.getConnection(), 0, clientConfig.getOperationRetryTimeout(),
-        clientConfig.getZkSerializer(), clientConfig.getMonitorType(), clientConfig.getMonitorKey(),
-        clientConfig.getMonitorInstanceName(), clientConfig.isMonitorRootPathOnly());
-    _connectionManager = connectionManager;
-    // Register to the base dedicated RealmAwareZkClient
-    _connectionManager.registerWatcher(this);
-    _onCloseCallback = callback;
+
+  private final HelixZkClient _innerSharedZkClient;
+  private final String _zkRealmShardingKey;
+  private final MetadataStoreRoutingData _metadataStoreRoutingData;
+  private final String _zkRealmAddress;
+
+  public SharedZkClient(RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfig,
+      RealmAwareZkClient.RealmAwareZkClientConfig clientConfig,
+      MetadataStoreRoutingData metadataStoreRoutingData) {
+
+    if (connectionConfig == null) {
+      throw new IllegalArgumentException("RealmAwareZkConnectionConfig cannot be null!");
+    }
+    _zkRealmShardingKey = connectionConfig.getZkRealmShardingKey();
+
+    if (metadataStoreRoutingData == null) {
+      throw new IllegalArgumentException("MetadataStoreRoutingData cannot be null!");
+    }
+    _metadataStoreRoutingData = metadataStoreRoutingData;
+
+    // TODO: use _zkRealmShardingKey to generate zkRealmAddress. This can done the same way of pull 765 once @hunter check it in.
+    // Get the ZkRealm address based on the ZK path sharding key
+    String zkRealmAddress = _metadataStoreRoutingData.getMetadataStoreRealm(_zkRealmShardingKey);
+    if (zkRealmAddress == null || zkRealmAddress.isEmpty()) {
+      throw new IllegalArgumentException(
+          "ZK realm address for the given ZK realm sharding key is invalid! ZK realm address: "
+              + zkRealmAddress + " ZK realm sharding key: " + _zkRealmShardingKey);
+    }
+    _zkRealmAddress = zkRealmAddress;
+
+    // Create an InnerSharedZkClient to actually serve ZK requests
+    // TODO: Rename HelixZkClient in the future or remove it entirely - this will be a backward-compatibility breaking change because HelixZkClient is being used by Helix users.
+
+    // Note, here delegate _innerSharedZkClient would share the same connectionManager. Once the close() API of
+    // SharedZkClient is invoked, we can just call the close() API of delegate _innerSharedZkClient. This would follow
+    // exactly the pattern of innerSharedZkClient closing logic, which would close the connectionManager when the last
+    // sharedInnerZkClient is closed.
+    HelixZkClient.ZkConnectionConfig zkConnectionConfig =
+        new HelixZkClient.ZkConnectionConfig(zkRealmAddress)
+            .setSessionTimeout(connectionConfig.getSessionTimeout());
+    HelixZkClient.ZkClientConfig zkClientConfig = new HelixZkClient.ZkClientConfig();
+    zkClientConfig.setZkSerializer(clientConfig.getZkSerializer())
+        .setConnectInitTimeout(clientConfig.getConnectInitTimeout())
+        .setOperationRetryTimeout(clientConfig.getOperationRetryTimeout())
+        .setMonitorInstanceName(clientConfig.getMonitorInstanceName())
+        .setMonitorKey(clientConfig.getMonitorKey()).setMonitorType(clientConfig.getMonitorType())
+        .setMonitorRootPathOnly(clientConfig.isMonitorRootPathOnly());
+    _innerSharedZkClient =
+        SharedZkClientFactory.getInstance().buildZkClient(zkConnectionConfig, zkClientConfig);
+  }
+
+  @Override
+  public List<String> subscribeChildChanges(String path, IZkChildListener listener) {
+    checkIfPathContainsShardingKey(path);
+    return _innerSharedZkClient.subscribeChildChanges(path, listener);
+  }
+
+  @Override
+  public void unsubscribeChildChanges(String path, IZkChildListener listener) {
+    checkIfPathContainsShardingKey(path);
+    _innerSharedZkClient.unsubscribeChildChanges(path, listener);
+  }
+
+  @Override
+  public void subscribeDataChanges(String path, IZkDataListener listener) {
+    checkIfPathContainsShardingKey(path);
+    _innerSharedZkClient.subscribeDataChanges(path, listener);
+  }
+
+  @Override
+  public void unsubscribeDataChanges(String path, IZkDataListener listener) {
+    checkIfPathContainsShardingKey(path);
+    _innerSharedZkClient.unsubscribeDataChanges(path, listener);
+  }
+
+  @Override
+  public void subscribeStateChanges(IZkStateListener listener) {
+    _innerSharedZkClient.subscribeStateChanges(listener);
+  }
+
+  @Override
+  public void unsubscribeStateChanges(IZkStateListener listener) {
+    _innerSharedZkClient.unsubscribeStateChanges(listener);
+  }
+
+  @Override
+  public void unsubscribeAll() {
+    _innerSharedZkClient.unsubscribeAll();
+  }
+
+  @Override
+  public void createPersistent(String path) {
+    checkIfPathContainsShardingKey(path);
+    _innerSharedZkClient.createPersistent(path);
+  }
+
+  @Override
+  public void createPersistent(String path, boolean createParents) {
+    checkIfPathContainsShardingKey(path);
+    _innerSharedZkClient.createPersistent(path, createParents);
+  }
+
+  @Override
+  public void createPersistent(String path, boolean createParents, List<ACL> acl) {
+    checkIfPathContainsShardingKey(path);
+    _innerSharedZkClient.createPersistent(path, createParents, acl);
+  }
+
+  @Override
+  public void createPersistent(String path, Object data) {
+    checkIfPathContainsShardingKey(path);
+    _innerSharedZkClient.createPersistent(path, data);
+  }
+
+  @Override
+  public void createPersistent(String path, Object data, List<ACL> acl) {
+    checkIfPathContainsShardingKey(path);
+    _innerSharedZkClient.createPersistent(path, data, acl);
+  }
+
+  @Override
+  public String createPersistentSequential(String path, Object data) {
+    checkIfPathContainsShardingKey(path);
+    return _innerSharedZkClient.createPersistentSequential(path, data);
+  }
+
+  @Override
+  public String createPersistentSequential(String path, Object data, List<ACL> acl) {
+    checkIfPathContainsShardingKey(path);
+    return _innerSharedZkClient.createPersistentSequential(path, data, acl);
+  }
+
+  @Override
+  public void createEphemeral(String path) {
+    throw new UnsupportedOperationException(
+        "Creating ephemeral nodes using " + SharedZkClient.class.getSimpleName()
+            + " is not supported.");
+  }
+
+  @Override
+  public void createEphemeral(String path, String sessionId) {
+    throw new UnsupportedOperationException(
+        "Creating ephemeral nodes using " + SharedZkClient.class.getSimpleName()
+            + " is not supported.");
+  }
+
+  @Override
+  public void createEphemeral(String path, List<ACL> acl) {
+    throw new UnsupportedOperationException(
+        "Creating ephemeral nodes using " + SharedZkClient.class.getSimpleName()
+            + " is not supported.");
+  }
+
+  @Override
+  public void createEphemeral(String path, List<ACL> acl, String sessionId) {
+    throw new UnsupportedOperationException(
+        "Creating ephemeral nodes using " + SharedZkClient.class.getSimpleName()
+            + " is not supported.");
+  }
+
+  @Override
+  public String create(String path, Object data, CreateMode mode) {
+    checkIfPathContainsShardingKey(path);
+    // delegate to _innerSharedZkClient is fine as _innerSharedZkClient would not allow creating ephemeral node.
+    // this still keeps the same behavior.
+    return _innerSharedZkClient.create(path, data, mode);
+  }
+
+  @Override
+  public String create(String path, Object datat, List<ACL> acl, CreateMode mode) {
+    checkIfPathContainsShardingKey(path);
+    return _innerSharedZkClient.create(path, datat, acl, mode);
+  }
+
+  @Override
+  public void createEphemeral(String path, Object data) {
+    throw new UnsupportedOperationException(
+        "Creating ephemeral nodes using " + SharedZkClient.class.getSimpleName()
+            + " is not supported.");
+  }
+
+  @Override
+  public void createEphemeral(String path, Object data, String sessionId) {
+    throw new UnsupportedOperationException(
+        "Creating ephemeral nodes using " + SharedZkClient.class.getSimpleName()
+            + " is not supported.");
+  }
+
+  @Override
+  public void createEphemeral(String path, Object data, List<ACL> acl) {
+    throw new UnsupportedOperationException(
+        "Creating ephemeral nodes using " + SharedZkClient.class.getSimpleName()
+            + " is not supported.");
+  }
+
+  @Override
+  public void createEphemeral(String path, Object data, List<ACL> acl, String sessionId) {
+    throw new UnsupportedOperationException(
+        "Creating ephemeral nodes using " + SharedZkClient.class.getSimpleName()
+            + " is not supported.");
+  }
+
+  @Override
+  public String createEphemeralSequential(String path, Object data) {
+    throw new UnsupportedOperationException(
+        "Creating ephemeral nodes using " + SharedZkClient.class.getSimpleName()
+            + " is not supported.");
+  }
+
+  @Override
+  public String createEphemeralSequential(String path, Object data, List<ACL> acl) {
+    throw new UnsupportedOperationException(
+        "Creating ephemeral nodes using " + SharedZkClient.class.getSimpleName()
+            + " is not supported.");
+  }
+
+  @Override
+  public String createEphemeralSequential(String path, Object data, String sessionId) {
+    throw new UnsupportedOperationException(
+        "Creating ephemeral nodes using " + SharedZkClient.class.getSimpleName()
+            + " is not supported.");
+  }
+
+  @Override
+  public String createEphemeralSequential(String path, Object data, List<ACL> acl,
+      String sessionId) {
+    throw new UnsupportedOperationException(
+        "Creating ephemeral nodes using " + SharedZkClient.class.getSimpleName()
+            + " is not supported.");
+  }
+
+  @Override
+  public List<String> getChildren(String path) {
+    checkIfPathContainsShardingKey(path);
+    return _innerSharedZkClient.getChildren(path);
+  }
+
+  @Override
+  public int countChildren(String path) {
+    checkIfPathContainsShardingKey(path);
+    return _innerSharedZkClient.countChildren(path);
+  }
+
+  @Override
+  public boolean exists(String path) {
+    checkIfPathContainsShardingKey(path);
+    return _innerSharedZkClient.exists(path);
+  }
+
+  @Override
+  public Stat getStat(String path) {
+    checkIfPathContainsShardingKey(path);
+    return _innerSharedZkClient.getStat(path);
+  }
+
+  @Override
+  public boolean waitUntilExists(String path, TimeUnit timeUnit, long time) {
+    checkIfPathContainsShardingKey(path);
+    return _innerSharedZkClient.waitUntilExists(path, timeUnit, time);
+  }
+
+  @Override
+  public void deleteRecursively(String path) {
+    checkIfPathContainsShardingKey(path);
+    _innerSharedZkClient.deleteRecursively(path);
+  }
+
+  @Override
+  public boolean delete(String path) {
+    checkIfPathContainsShardingKey(path);
+    return _innerSharedZkClient.delete(path);
+  }
+
+  @Override
+  public <T> T readData(String path) {
+    checkIfPathContainsShardingKey(path);
+    return _innerSharedZkClient.readData(path);
+  }
+
+  @Override
+  public <T> T readData(String path, boolean returnNullIfPathNotExists) {
+    checkIfPathContainsShardingKey(path);
+    return _innerSharedZkClient.readData(path, returnNullIfPathNotExists);
+  }
+
+  @Override
+  public <T> T readData(String path, Stat stat) {
+    checkIfPathContainsShardingKey(path);
+    return _innerSharedZkClient.readData(path, stat);
+  }
+
+  @Override
+  public <T> T readData(String path, Stat stat, boolean watch) {
+    checkIfPathContainsShardingKey(path);
+    return _innerSharedZkClient.readData(path, stat, watch);
+  }
+
+  @Override
+  public <T> T readDataAndStat(String path, Stat stat, boolean returnNullIfPathNotExists) {
+    checkIfPathContainsShardingKey(path);
+    return _innerSharedZkClient.readDataAndStat(path, stat, returnNullIfPathNotExists);
+  }
+
+  @Override
+  public void writeData(String path, Object object) {
+    checkIfPathContainsShardingKey(path);
+    _innerSharedZkClient.writeData(path, object);
+  }
+
+  @Override
+  public <T> void updateDataSerialized(String path, DataUpdater<T> updater) {
+    checkIfPathContainsShardingKey(path);
+    _innerSharedZkClient.updateDataSerialized(path, updater);
+  }
+
+  @Override
+  public void writeData(String path, Object datat, int expectedVersion) {
+    checkIfPathContainsShardingKey(path);
+    _innerSharedZkClient.writeDataReturnStat(path, datat, expectedVersion);
+  }
+
+  @Override
+  public Stat writeDataReturnStat(String path, Object datat, int expectedVersion) {
+    checkIfPathContainsShardingKey(path);
+    return _innerSharedZkClient.writeDataReturnStat(path, datat, expectedVersion);
+  }
+
+  @Override
+  public Stat writeDataGetStat(String path, Object datat, int expectedVersion) {
+    checkIfPathContainsShardingKey(path);
+    return _innerSharedZkClient.writeDataReturnStat(path, datat, expectedVersion);
+  }
+
+  @Override
+  public void asyncCreate(String path, Object datat, CreateMode mode,
+      ZkAsyncCallbacks.CreateCallbackHandler cb) {
+    checkIfPathContainsShardingKey(path);
+    _innerSharedZkClient.asyncCreate(path, datat, mode, cb);
+  }
+
+  @Override
+  public void asyncSetData(String path, Object datat, int version,
+      ZkAsyncCallbacks.SetDataCallbackHandler cb) {
+    checkIfPathContainsShardingKey(path);
+    _innerSharedZkClient.asyncSetData(path, datat, version, cb);
+  }
+
+  @Override
+  public void asyncGetData(String path, ZkAsyncCallbacks.GetDataCallbackHandler cb) {
+    checkIfPathContainsShardingKey(path);
+    _innerSharedZkClient.asyncGetData(path, cb);
+  }
+
+  @Override
+  public void asyncExists(String path, ZkAsyncCallbacks.ExistsCallbackHandler cb) {
+    checkIfPathContainsShardingKey(path);
+    _innerSharedZkClient.asyncExists(path, cb);
+  }
+
+  @Override
+  public void asyncDelete(String path, ZkAsyncCallbacks.DeleteCallbackHandler cb) {
+    checkIfPathContainsShardingKey(path);
+    _innerSharedZkClient.asyncDelete(path, cb);
+  }
+
+  @Override
+  public void watchForData(String path) {
+    checkIfPathContainsShardingKey(path);
+    _innerSharedZkClient.watchForData(path);
+  }
+
+  @Override
+  public List<String> watchForChilds(String path) {
+    checkIfPathContainsShardingKey(path);
+    return _innerSharedZkClient.watchForChilds(path);
+  }
+
+  @Override
+  public long getCreationTime(String path) {
+    checkIfPathContainsShardingKey(path);
+    return _innerSharedZkClient.getCreationTime(path);
+  }
+
+  @Override
+  public List<OpResult> multi(Iterable<Op> ops) {
+    return _innerSharedZkClient.multi(ops);
+  }
+
+  @Override
+  public boolean waitUntilConnected(long time, TimeUnit timeUnit) {
+    return _innerSharedZkClient.waitUntilConnected(time, timeUnit);
+  }
+
+  @Override
+  public String getServers() {
+    return _innerSharedZkClient.getServers();
+  }
+
+  @Override
+  public long getSessionId() {
+    return _innerSharedZkClient.getSessionId();
   }
 
   @Override
   public void close() {
-    super.close();
-    if (isClosed()) {
-      // Note that if register is not done while constructing, these private fields may not be init yet.
-      if (_connectionManager != null) {
-        _connectionManager.unregisterWatcher(this);
-      }
-      if (_onCloseCallback != null) {
-        _onCloseCallback.onClose();
-      }
-    }
+    _innerSharedZkClient.close();
   }
 
   @Override
-  public IZkConnection getConnection() {
-    if (isClosed()) {
-      return IDLE_CONNECTION;
-    }
-    return super.getConnection();
+  public boolean isClosed() {
+    return _innerSharedZkClient.isClosed();
   }
 
-  /**
-   * Since ZkConnection session is shared in this RealmAwareZkClient, do not create ephemeral node using a SharedZKClient.
-   */
   @Override
-  public String create(final String path, Object datat, final List<ACL> acl,
-      final CreateMode mode) {
-    if (mode.isEphemeral()) {
-      throw new UnsupportedOperationException(
-          "Create ephemeral nodes using a " + SharedZkClient.class.getSimpleName()
-              + " is not supported.");
-    }
-    return super.create(path, datat, acl, mode);
+  public byte[] serialize(Object data, String path) {
+    checkIfPathContainsShardingKey(path);
+    return _innerSharedZkClient.serialize(data, path);
+  }
+
+  @Override
+  public <T> T deserialize(byte[] data, String path) {
+    checkIfPathContainsShardingKey(path);
+    return _innerSharedZkClient.deserialize(data, path);
+  }
+
+  @Override
+  public void setZkSerializer(ZkSerializer zkSerializer) {
+    _innerSharedZkClient.setZkSerializer(zkSerializer);
+  }
+
+  @Override
+  public void setZkSerializer(PathBasedZkSerializer zkSerializer) {
+    _innerSharedZkClient.setZkSerializer(zkSerializer);
   }
 
   @Override
-  protected boolean isManagingZkConnection() {
-    return false;
+  public PathBasedZkSerializer getZkSerializer() {
+    return _innerSharedZkClient.getZkSerializer();
+  }
+
+  private void checkIfPathContainsShardingKey(String path) {
+    // TODO: replace with the singleton MetadataStoreRoutingData
+    try {
+      String zkRealmForPath = _metadataStoreRoutingData.getMetadataStoreRealm(path);
+      if (!_zkRealmAddress.equals(zkRealmForPath)) {
+        throw new IllegalArgumentException("Given path: " + path + "'s ZK realm: " + zkRealmForPath
+            + " does not match the ZK realm: " + _zkRealmAddress + " and sharding key: "
+            + _zkRealmShardingKey + " for this DedicatedZkClient!");
+      }
+    } catch (NoSuchElementException e) {
+      throw new IllegalArgumentException(
+          "Given path: " + path + " does not have a valid sharding key!");
+    }
   }
-}
+}
\ No newline at end of file
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/factory/SharedZkClientFactory.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/factory/SharedZkClientFactory.java
index 1801614..80c58bf 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/factory/SharedZkClientFactory.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/factory/SharedZkClientFactory.java
@@ -20,12 +20,18 @@ package org.apache.helix.zookeeper.impl.factory;
  */
 
 import java.util.HashMap;
+import java.util.List;
 
 import org.apache.helix.msdcommon.datamodel.MetadataStoreRoutingData;
 import org.apache.helix.zookeeper.api.client.HelixZkClient;
 import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
 import org.apache.helix.zookeeper.exception.ZkClientException;
 import org.apache.helix.zookeeper.impl.client.SharedZkClient;
+import org.apache.helix.zookeeper.impl.client.ZkClient;
+import org.apache.helix.zookeeper.zkclient.IZkConnection;
+import org.apache.helix.zookeeper.zkclient.ZkConnection;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.data.ACL;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -39,6 +45,12 @@ public class SharedZkClientFactory extends HelixZkClientFactory {
   private final HashMap<HelixZkClient.ZkConnectionConfig, ZkConnectionManager>
       _connectionManagerPool = new HashMap<>();
 
+  /*
+   * Since we cannot really disconnect the ZkConnection, we need a dummy ZkConnection placeholder.
+   * This is to ensure connection field is never null even the shared RealmAwareZkClient instance is closed so as to avoid NPE.
+   */
+  private final static ZkConnection IDLE_CONNECTION = new ZkConnection("Dummy_ZkServers");
+
   protected SharedZkClientFactory() {
   }
 
@@ -47,9 +59,8 @@ public class SharedZkClientFactory extends HelixZkClientFactory {
       RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfig,
       RealmAwareZkClient.RealmAwareZkClientConfig clientConfig,
       MetadataStoreRoutingData metadataStoreRoutingData) {
-    // TODO: Implement the logic
-    // Return an instance of SharedZkClient
-    return null;
+    // Note, the logic sharing connectionManager logic is inside SharedZkClient, similar to innerSharedZkClient.
+    return new SharedZkClient(connectionConfig, clientConfig, metadataStoreRoutingData);
   }
 
   @Override
@@ -84,14 +95,14 @@ public class SharedZkClientFactory extends HelixZkClientFactory {
       if (zkConnectionManager == null) {
         throw new ZkClientException("Failed to create a connection manager in the pool to share.");
       }
-      LOG.info("Sharing ZkConnection {} to a new SharedZkClient.", connectionConfig.toString());
-      return new SharedZkClient(zkConnectionManager, clientConfig,
-          new SharedZkClient.OnCloseCallback() {
-            @Override
-            public void onClose() {
-              cleanupConnectionManager(zkConnectionManager);
-            }
-          });
+      LOG.info("Sharing ZkConnection {} to a new InnerSharedZkClient.",
+          connectionConfig.toString());
+      return new InnerSharedZkClient(zkConnectionManager, clientConfig, new OnCloseCallback() {
+        @Override
+        public void onClose() {
+          cleanupConnectionManager(zkConnectionManager);
+        }
+      });
     }
   }
 
@@ -128,4 +139,78 @@ public class SharedZkClientFactory extends HelixZkClientFactory {
     }
     return count;
   }
+
+  public interface OnCloseCallback {
+    /**
+     * Triggered after the SharedZkClient is closed.
+     */
+    void onClose();
+  }
+
+  /**
+   * NOTE: do NOT use this class directly. Please use SharedZkClientFactory to create an instance of SharedZkClient.
+   * InnerSharedZkClient is a ZkClient used by SharedZkClient to power ZK operations against a single ZK realm.
+   *
+   * NOTE2: current InnerSharedZkClient replace the original SharedZKClient. We intend to keep the behavior of original
+   * SharedZkClient intact. (Think of rename the original SharedZkClient as InnerSharedZkClient. This would maintain
+   * backward compatibility.
+   */
+  public static class InnerSharedZkClient extends ZkClient implements HelixZkClient {
+
+    private final OnCloseCallback _onCloseCallback;
+    private final ZkConnectionManager _connectionManager;
+
+    public InnerSharedZkClient(ZkConnectionManager connectionManager, ZkClientConfig clientConfig,
+        OnCloseCallback callback) {
+      super(connectionManager.getConnection(), 0, clientConfig.getOperationRetryTimeout(),
+          clientConfig.getZkSerializer(), clientConfig.getMonitorType(),
+          clientConfig.getMonitorKey(), clientConfig.getMonitorInstanceName(),
+          clientConfig.isMonitorRootPathOnly());
+      _connectionManager = connectionManager;
+      // Register to the base dedicated RealmAwareZkClient
+      _connectionManager.registerWatcher(this);
+      _onCloseCallback = callback;
+    }
+
+    @Override
+    public void close() {
+      super.close();
+      if (isClosed()) {
+        // Note that if register is not done while constructing, these private fields may not be init yet.
+        if (_connectionManager != null) {
+          _connectionManager.unregisterWatcher(this);
+        }
+        if (_onCloseCallback != null) {
+          _onCloseCallback.onClose();
+        }
+      }
+    }
+
+    @Override
+    public IZkConnection getConnection() {
+      if (isClosed()) {
+        return IDLE_CONNECTION;
+      }
+      return super.getConnection();
+    }
+
+    /**
+     * Since ZkConnection session is shared in this HelixZkClient, do not create ephemeral node using a SharedZKClient.
+     */
+    @Override
+    public String create(final String path, Object datat, final List<ACL> acl,
+        final CreateMode mode) {
+      if (mode.isEphemeral()) {
+        throw new UnsupportedOperationException(
+            "Create ephemeral nodes using " + SharedZkClient.class.getSimpleName()
+                + " is not supported.");
+      }
+      return super.create(path, datat, acl, mode);
+    }
+
+    @Override
+    protected boolean isManagingZkConnection() {
+      return false;
+    }
+  }
 }
diff --git a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/RealmAwareZkClientTestBase.java b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/RealmAwareZkClientTestBase.java
index cd74975..d500ce4 100644
--- a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/RealmAwareZkClientTestBase.java
+++ b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/RealmAwareZkClientTestBase.java
@@ -38,16 +38,16 @@ import org.testng.annotations.Test;
 
 
 public abstract class RealmAwareZkClientTestBase extends ZkTestBase {
-  private static final String ZK_SHARDING_KEY_PREFIX = "/TEST_SHARDING_KEY";
-  private static final String TEST_VALID_PATH = ZK_SHARDING_KEY_PREFIX + "_" + 0 + "/a/b/c";
-  private static final String TEST_INVALID_PATH = ZK_SHARDING_KEY_PREFIX + "_invalid" + "/a/b/c";
+  protected static final String ZK_SHARDING_KEY_PREFIX = "/TEST_SHARDING_KEY";
+  protected static final String TEST_VALID_PATH = ZK_SHARDING_KEY_PREFIX + "_" + 0 + "/a/b/c";
+  protected static final String TEST_INVALID_PATH = ZK_SHARDING_KEY_PREFIX + "_invalid" + "/a/b/c";
 
   // <Realm, List of sharding keys> Mapping
   private static final Map<String, List<String>> RAW_ROUTING_DATA = new HashMap<>();
 
   // The following RealmAwareZkClientFactory is to be defined in subclasses
   protected RealmAwareZkClientFactory _realmAwareZkClientFactory;
-  private RealmAwareZkClient _realmAwareZkClient;
+  protected RealmAwareZkClient _realmAwareZkClient;
   private MetadataStoreRoutingData _metadataStoreRoutingData;
 
   @BeforeClass
diff --git a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestSharedZkClient.java b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestSharedZkClient.java
new file mode 100644
index 0000000..364f66f
--- /dev/null
+++ b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestSharedZkClient.java
@@ -0,0 +1,47 @@
+package org.apache.helix.zookeeper.impl.client;
+
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer;
+import org.apache.helix.zookeeper.impl.factory.DedicatedZkClientFactory;
+import org.apache.helix.zookeeper.impl.factory.SharedZkClientFactory;
+import org.apache.zookeeper.CreateMode;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class TestSharedZkClient extends RealmAwareZkClientTestBase {
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    super.beforeClass();
+    // Set the factory to SharedZkClientFactory
+    _realmAwareZkClientFactory = SharedZkClientFactory.getInstance();
+  }
+
+  @Test
+  public void testCreateEphemeralFailure() {
+    _realmAwareZkClient.setZkSerializer(new ZNRecordSerializer());
+
+    // Create a dummy ZNRecord
+    ZNRecord znRecord = new ZNRecord("DummyRecord");
+    znRecord.setSimpleField("Dummy", "Value");
+
+    // test createEphemeral should fail
+    try {
+      _realmAwareZkClient.createEphemeral(TEST_VALID_PATH);
+      Assert.fail(
+          "sharedReamlAwareZkClient is not expected to be able to create ephemeral node via createEphemeral");
+    } catch (UnsupportedOperationException e) {
+      // this is expected
+    }
+
+    // test creating Ephemeral via creat would also fail
+    try {
+      _realmAwareZkClient.create(TEST_VALID_PATH, znRecord, CreateMode.EPHEMERAL);
+      Assert.fail(
+          "sharedRealmAwareZkClient is not expected to be able to create ephmeral node via create");
+    } catch (UnsupportedOperationException e) {
+      // this is expected.
+    }
+  }
+}
\ No newline at end of file