You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by hu...@apache.org on 2020/04/01 22:47:13 UTC

[helix] 18/49: Add DedicatedZkClient and update DedicatedZkClientFactory (#765)

This is an automated email from the ASF dual-hosted git repository.

hulee pushed a commit to branch zooscalability
in repository https://gitbox.apache.org/repos/asf/helix.git

commit c132ace4e95cb56ff63a018fb748f7b1d217be96
Author: Hunter Lee <hu...@linkedin.com>
AuthorDate: Thu Feb 20 16:17:03 2020 -0800

    Add DedicatedZkClient and update DedicatedZkClientFactory (#765)
    
    As part of ZkClient API enhancement, we wish to add DedicatedZkClient, which is a wrapper of the raw ZkClient, that provides realm-aware access to ZooKeeper.
    
    Realm-aware in that it only performs requests whose path's Zk path sharding key belongs to the ZK realm it's connected to.
    
    Also, we need to modify DedicatedZkClientFactory so that users could use this factory to generate instances of DedicatedZkClient.
---
 zookeeper-api/pom.xml                              |  11 +
 .../api/factory/RealmAwareZkClientFactory.java     |  16 +-
 .../zookeeper/impl/client/DedicatedZkClient.java   | 473 +++++++++++++++++++++
 .../impl/factory/DedicatedZkClientFactory.java     |  19 +-
 .../impl/factory/SharedZkClientFactory.java        |   7 +-
 .../apache/helix/zookeeper/impl/ZkTestBase.java    | 148 +++++++
 .../impl/client/RealmAwareZkClientTestBase.java    | 163 +++++++
 .../impl/client/TestDedicatedZkClient.java         |  35 ++
 8 files changed, 854 insertions(+), 18 deletions(-)

diff --git a/zookeeper-api/pom.xml b/zookeeper-api/pom.xml
index 5ec1e56..91b448f 100644
--- a/zookeeper-api/pom.xml
+++ b/zookeeper-api/pom.xml
@@ -44,6 +44,11 @@ under the License.
       <version>${project.version}</version>
     </dependency>
     <dependency>
+      <groupId>org.apache.helix</groupId>
+      <artifactId>metadata-store-directory-common</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
       <groupId>org.apache.zookeeper</groupId>
       <artifactId>zookeeper</artifactId>
       <version>3.4.13</version>
@@ -79,6 +84,12 @@ under the License.
       <artifactId>testng</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>commons-io</groupId>
+      <artifactId>commons-io</artifactId>
+      <version>2.6</version>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
   <build>
     <resources>
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/factory/RealmAwareZkClientFactory.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/factory/RealmAwareZkClientFactory.java
index f68ffe4..8c1f7a3 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/factory/RealmAwareZkClientFactory.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/factory/RealmAwareZkClientFactory.java
@@ -19,6 +19,7 @@ package org.apache.helix.zookeeper.api.factory;
  * under the License.
  */
 
+import org.apache.helix.msdcommon.datamodel.MetadataStoreRoutingData;
 import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
 
 
@@ -30,16 +31,25 @@ public interface RealmAwareZkClientFactory {
    * Build a RealmAwareZkClient using specified connection config and client config.
    * @param connectionConfig
    * @param clientConfig
+   * @param metadataStoreRoutingData
    * @return HelixZkClient
    */
+  // TODO: remove MetadataStoreRoutingData
   RealmAwareZkClient buildZkClient(RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfig,
-      RealmAwareZkClient.RealmAwareZkClientConfig clientConfig);
+      RealmAwareZkClient.RealmAwareZkClientConfig clientConfig,
+      MetadataStoreRoutingData metadataStoreRoutingData);
 
   /**
    * Builds a RealmAwareZkClient using specified connection config and default client config.
    * @param connectionConfig
+   * @param metadataStoreRoutingData
    * @return RealmAwareZkClient
    */
-  RealmAwareZkClient buildZkClient(
-      RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfig);
+  // TODO: remove MetadataStoreRoutingData
+  default RealmAwareZkClient buildZkClient(
+      RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfig,
+      MetadataStoreRoutingData metadataStoreRoutingData) {
+    return buildZkClient(connectionConfig, new RealmAwareZkClient.RealmAwareZkClientConfig(),
+        metadataStoreRoutingData);
+  }
 }
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/DedicatedZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/DedicatedZkClient.java
new file mode 100644
index 0000000..8352b2b
--- /dev/null
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/DedicatedZkClient.java
@@ -0,0 +1,473 @@
+package org.apache.helix.zookeeper.impl.client;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.helix.msdcommon.datamodel.MetadataStoreRoutingData;
+import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
+import org.apache.helix.zookeeper.zkclient.DataUpdater;
+import org.apache.helix.zookeeper.zkclient.IZkChildListener;
+import org.apache.helix.zookeeper.zkclient.IZkConnection;
+import org.apache.helix.zookeeper.zkclient.IZkDataListener;
+import org.apache.helix.zookeeper.zkclient.ZkConnection;
+import org.apache.helix.zookeeper.zkclient.callback.ZkAsyncCallbacks;
+import org.apache.helix.zookeeper.zkclient.deprecated.IZkStateListener;
+import org.apache.helix.zookeeper.zkclient.serialize.PathBasedZkSerializer;
+import org.apache.helix.zookeeper.zkclient.serialize.ZkSerializer;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.Op;
+import org.apache.zookeeper.OpResult;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * NOTE: DO NOT USE THIS CLASS DIRECTLY. Use DedicatedZkClientFactory to create instances of DedicatedZkClient.
+ *
+ * An implementation of the RealmAwareZkClient interface.
+ * Supports CRUD, data change subscription, and ephemeral mode operations.
+ */
+public class DedicatedZkClient implements RealmAwareZkClient {
+  private static Logger LOG = LoggerFactory.getLogger(DedicatedZkClient.class);
+
+  private final ZkClient _rawZkClient;
+  private final MetadataStoreRoutingData _metadataStoreRoutingData;
+  private final String _zkRealmShardingKey;
+  private final String _zkRealmAddress;
+
+  // TODO: Remove MetadataStoreRoutingData from constructor
+  public DedicatedZkClient(RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfig,
+      RealmAwareZkClient.RealmAwareZkClientConfig clientConfig,
+      MetadataStoreRoutingData metadataStoreRoutingData) {
+
+    if (connectionConfig == null) {
+      throw new IllegalArgumentException("RealmAwareZkConnectionConfig cannot be null!");
+    }
+    _zkRealmShardingKey = connectionConfig.getZkRealmShardingKey();
+
+    if (metadataStoreRoutingData == null) {
+      throw new IllegalArgumentException("MetadataStoreRoutingData cannot be null!");
+    }
+    _metadataStoreRoutingData = metadataStoreRoutingData;
+
+    // TODO: Get it from static map/singleton (HttpRoutingDataReader)
+    // Get the ZkRealm address based on the ZK path sharding key
+    String zkRealmAddress = _metadataStoreRoutingData.getMetadataStoreRealm(_zkRealmShardingKey);
+    if (zkRealmAddress == null || zkRealmAddress.isEmpty()) {
+      throw new IllegalArgumentException(
+          "ZK realm address for the given ZK realm sharding key is invalid! ZK realm address: "
+              + zkRealmAddress + " ZK realm sharding key: " + _zkRealmShardingKey);
+    }
+    _zkRealmAddress = zkRealmAddress;
+
+    // Create a ZK connection
+    IZkConnection zkConnection =
+        new ZkConnection(zkRealmAddress, connectionConfig.getSessionTimeout());
+
+    // Create a ZkClient
+    _rawZkClient = new ZkClient(zkConnection, (int) clientConfig.getConnectInitTimeout(),
+        clientConfig.getOperationRetryTimeout(), clientConfig.getZkSerializer(),
+        clientConfig.getMonitorType(), clientConfig.getMonitorKey(),
+        clientConfig.getMonitorInstanceName(), clientConfig.isMonitorRootPathOnly());
+  }
+
+  @Override
+  public List<String> subscribeChildChanges(String path, IZkChildListener listener) {
+    checkIfPathContainsShardingKey(path);
+    return _rawZkClient.subscribeChildChanges(path, listener);
+  }
+
+  @Override
+  public void unsubscribeChildChanges(String path, IZkChildListener listener) {
+    checkIfPathContainsShardingKey(path);
+    _rawZkClient.unsubscribeChildChanges(path, listener);
+  }
+
+  @Override
+  public void subscribeDataChanges(String path, IZkDataListener listener) {
+    checkIfPathContainsShardingKey(path);
+    _rawZkClient.subscribeDataChanges(path, listener);
+  }
+
+  @Override
+  public void unsubscribeDataChanges(String path, IZkDataListener listener) {
+    checkIfPathContainsShardingKey(path);
+    _rawZkClient.unsubscribeDataChanges(path, listener);
+  }
+
+  @Override
+  public void subscribeStateChanges(IZkStateListener listener) {
+    _rawZkClient.subscribeStateChanges(listener);
+  }
+
+  @Override
+  public void unsubscribeStateChanges(IZkStateListener listener) {
+    _rawZkClient.unsubscribeStateChanges(listener);
+  }
+
+  @Override
+  public void unsubscribeAll() {
+    _rawZkClient.unsubscribeAll();
+  }
+
+  @Override
+  public void createPersistent(String path) {
+    createPersistent(path, false);
+  }
+
+  @Override
+  public void createPersistent(String path, boolean createParents) {
+    createPersistent(path, createParents, ZooDefs.Ids.OPEN_ACL_UNSAFE);
+  }
+
+  @Override
+  public void createPersistent(String path, boolean createParents, List<ACL> acl) {
+    checkIfPathContainsShardingKey(path);
+    _rawZkClient.createPersistent(path, createParents, acl);
+  }
+
+  @Override
+  public void createPersistent(String path, Object data) {
+    create(path, data, CreateMode.PERSISTENT);
+  }
+
+  @Override
+  public void createPersistent(String path, Object data, List<ACL> acl) {
+    create(path, data, acl, CreateMode.PERSISTENT);
+  }
+
+  @Override
+  public String createPersistentSequential(String path, Object data) {
+    return create(path, data, CreateMode.PERSISTENT_SEQUENTIAL);
+  }
+
+  @Override
+  public String createPersistentSequential(String path, Object data, List<ACL> acl) {
+    return create(path, data, acl, CreateMode.PERSISTENT_SEQUENTIAL);
+  }
+
+  @Override
+  public void createEphemeral(String path) {
+    create(path, null, CreateMode.EPHEMERAL);
+  }
+
+  @Override
+  public void createEphemeral(String path, String sessionId) {
+    createEphemeral(path, null, sessionId);
+  }
+
+  @Override
+  public void createEphemeral(String path, List<ACL> acl) {
+    create(path, null, acl, CreateMode.EPHEMERAL);
+  }
+
+  @Override
+  public void createEphemeral(String path, List<ACL> acl, String sessionId) {
+    checkIfPathContainsShardingKey(path);
+    _rawZkClient.createEphemeral(path, acl, sessionId);
+  }
+
+  @Override
+  public String create(String path, Object data, CreateMode mode) {
+    return create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, mode);
+  }
+
+  @Override
+  public String create(String path, Object datat, List<ACL> acl, CreateMode mode) {
+    checkIfPathContainsShardingKey(path);
+    return _rawZkClient.create(path, datat, acl, mode);
+  }
+
+  @Override
+  public void createEphemeral(String path, Object data) {
+    create(path, data, CreateMode.EPHEMERAL);
+  }
+
+  @Override
+  public void createEphemeral(String path, Object data, String sessionId) {
+    checkIfPathContainsShardingKey(path);
+    _rawZkClient.createEphemeral(path, data, sessionId);
+  }
+
+  @Override
+  public void createEphemeral(String path, Object data, List<ACL> acl) {
+    create(path, data, acl, CreateMode.EPHEMERAL);
+  }
+
+  @Override
+  public void createEphemeral(String path, Object data, List<ACL> acl, String sessionId) {
+    checkIfPathContainsShardingKey(path);
+    _rawZkClient.createEphemeral(path, data, acl, sessionId);
+  }
+
+  @Override
+  public String createEphemeralSequential(String path, Object data) {
+    return create(path, data, CreateMode.EPHEMERAL_SEQUENTIAL);
+  }
+
+  @Override
+  public String createEphemeralSequential(String path, Object data, List<ACL> acl) {
+    return create(path, data, acl, CreateMode.EPHEMERAL_SEQUENTIAL);
+  }
+
+  @Override
+  public String createEphemeralSequential(String path, Object data, String sessionId) {
+    checkIfPathContainsShardingKey(path);
+    return _rawZkClient.createEphemeralSequential(path, data, sessionId);
+  }
+
+  @Override
+  public String createEphemeralSequential(String path, Object data, List<ACL> acl,
+      String sessionId) {
+    checkIfPathContainsShardingKey(path);
+    return _rawZkClient.createEphemeralSequential(path, data, acl, sessionId);
+  }
+
+  @Override
+  public List<String> getChildren(String path) {
+    checkIfPathContainsShardingKey(path);
+    return _rawZkClient.getChildren(path);
+  }
+
+  @Override
+  public int countChildren(String path) {
+    checkIfPathContainsShardingKey(path);
+    return _rawZkClient.countChildren(path);
+  }
+
+  @Override
+  public boolean exists(String path) {
+    checkIfPathContainsShardingKey(path);
+    return _rawZkClient.exists(path);
+  }
+
+  @Override
+  public Stat getStat(String path) {
+    checkIfPathContainsShardingKey(path);
+    return _rawZkClient.getStat(path);
+  }
+
+  @Override
+  public boolean waitUntilExists(String path, TimeUnit timeUnit, long time) {
+    checkIfPathContainsShardingKey(path);
+    return _rawZkClient.waitUntilExists(path, timeUnit, time);
+  }
+
+  @Override
+  public void deleteRecursively(String path) {
+    checkIfPathContainsShardingKey(path);
+    _rawZkClient.deleteRecursively(path);
+  }
+
+  @Override
+  public boolean delete(String path) {
+    checkIfPathContainsShardingKey(path);
+    return _rawZkClient.delete(path);
+  }
+
+  @Override
+  public <T> T readData(String path) {
+    return readData(path, false);
+  }
+
+  @Override
+  public <T> T readData(String path, boolean returnNullIfPathNotExists) {
+    checkIfPathContainsShardingKey(path);
+    return _rawZkClient.readData(path, returnNullIfPathNotExists);
+  }
+
+  @Override
+  public <T> T readData(String path, Stat stat) {
+    checkIfPathContainsShardingKey(path);
+    return _rawZkClient.readData(path, stat);
+  }
+
+  @Override
+  public <T> T readData(String path, Stat stat, boolean watch) {
+    checkIfPathContainsShardingKey(path);
+    return _rawZkClient.readData(path, stat, watch);
+  }
+
+  @Override
+  public <T> T readDataAndStat(String path, Stat stat, boolean returnNullIfPathNotExists) {
+    checkIfPathContainsShardingKey(path);
+    return _rawZkClient.readDataAndStat(path, stat, returnNullIfPathNotExists);
+  }
+
+  @Override
+  public void writeData(String path, Object object) {
+    writeData(path, object, -1);
+  }
+
+  @Override
+  public <T> void updateDataSerialized(String path, DataUpdater<T> updater) {
+    checkIfPathContainsShardingKey(path);
+    _rawZkClient.updateDataSerialized(path, updater);
+  }
+
+  @Override
+  public void writeData(String path, Object datat, int expectedVersion) {
+    writeDataReturnStat(path, datat, expectedVersion);
+  }
+
+  @Override
+  public Stat writeDataReturnStat(String path, Object datat, int expectedVersion) {
+    checkIfPathContainsShardingKey(path);
+    return _rawZkClient.writeDataReturnStat(path, datat, expectedVersion);
+  }
+
+  @Override
+  public Stat writeDataGetStat(String path, Object datat, int expectedVersion) {
+    return writeDataReturnStat(path, datat, expectedVersion);
+  }
+
+  @Override
+  public void asyncCreate(String path, Object datat, CreateMode mode,
+      ZkAsyncCallbacks.CreateCallbackHandler cb) {
+    checkIfPathContainsShardingKey(path);
+    _rawZkClient.asyncCreate(path, datat, mode, cb);
+  }
+
+  @Override
+  public void asyncSetData(String path, Object datat, int version,
+      ZkAsyncCallbacks.SetDataCallbackHandler cb) {
+    checkIfPathContainsShardingKey(path);
+    _rawZkClient.asyncSetData(path, datat, version, cb);
+  }
+
+  @Override
+  public void asyncGetData(String path, ZkAsyncCallbacks.GetDataCallbackHandler cb) {
+    checkIfPathContainsShardingKey(path);
+    _rawZkClient.asyncGetData(path, cb);
+  }
+
+  @Override
+  public void asyncExists(String path, ZkAsyncCallbacks.ExistsCallbackHandler cb) {
+    checkIfPathContainsShardingKey(path);
+    _rawZkClient.asyncExists(path, cb);
+  }
+
+  @Override
+  public void asyncDelete(String path, ZkAsyncCallbacks.DeleteCallbackHandler cb) {
+    checkIfPathContainsShardingKey(path);
+    _rawZkClient.asyncDelete(path, cb);
+  }
+
+  @Override
+  public void watchForData(String path) {
+    checkIfPathContainsShardingKey(path);
+    _rawZkClient.watchForData(path);
+  }
+
+  @Override
+  public List<String> watchForChilds(String path) {
+    checkIfPathContainsShardingKey(path);
+    return _rawZkClient.watchForChilds(path);
+  }
+
+  @Override
+  public long getCreationTime(String path) {
+    checkIfPathContainsShardingKey(path);
+    return _rawZkClient.getCreationTime(path);
+  }
+
+  @Override
+  public List<OpResult> multi(Iterable<Op> ops) {
+    return _rawZkClient.multi(ops);
+  }
+
+  @Override
+  public boolean waitUntilConnected(long time, TimeUnit timeUnit) {
+    return _rawZkClient.waitUntilConnected(time, timeUnit);
+  }
+
+  @Override
+  public String getServers() {
+    return _rawZkClient.getServers();
+  }
+
+  @Override
+  public long getSessionId() {
+    return _rawZkClient.getSessionId();
+  }
+
+  @Override
+  public void close() {
+    _rawZkClient.close();
+  }
+
+  @Override
+  public boolean isClosed() {
+    return _rawZkClient.isClosed();
+  }
+
+  @Override
+  public byte[] serialize(Object data, String path) {
+    checkIfPathContainsShardingKey(path);
+    return _rawZkClient.serialize(data, path);
+  }
+
+  @Override
+  public <T> T deserialize(byte[] data, String path) {
+    checkIfPathContainsShardingKey(path);
+    return _rawZkClient.deserialize(data, path);
+  }
+
+  @Override
+  public void setZkSerializer(ZkSerializer zkSerializer) {
+    _rawZkClient.setZkSerializer(zkSerializer);
+  }
+
+  @Override
+  public void setZkSerializer(PathBasedZkSerializer zkSerializer) {
+    _rawZkClient.setZkSerializer(zkSerializer);
+  }
+
+  @Override
+  public PathBasedZkSerializer getZkSerializer() {
+    return _rawZkClient.getZkSerializer();
+  }
+
+  /**
+   * Checks whether the given path belongs matches the ZK path sharding key this DedicatedZkClient is designated to at initialization.
+   * @param path
+   * @return
+   */
+  private void checkIfPathContainsShardingKey(String path) {
+    // TODO: replace with the singleton MetadataStoreRoutingData
+    try {
+      String zkRealmForPath = _metadataStoreRoutingData.getMetadataStoreRealm(path);
+      if (!_zkRealmAddress.equals(zkRealmForPath)) {
+        throw new IllegalArgumentException("Given path: " + path + "'s ZK realm: " + zkRealmForPath
+            + " does not match the ZK realm: " + _zkRealmAddress + " and sharding key: "
+            + _zkRealmShardingKey + " for this DedicatedZkClient!");
+      }
+    } catch (NoSuchElementException e) {
+      throw new IllegalArgumentException(
+          "Given path: " + path + " does not have a valid sharding key!");
+    }
+  }
+}
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/factory/DedicatedZkClientFactory.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/factory/DedicatedZkClientFactory.java
index 2695a5d..6694497 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/factory/DedicatedZkClientFactory.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/factory/DedicatedZkClientFactory.java
@@ -19,8 +19,10 @@ package org.apache.helix.zookeeper.impl.factory;
  * under the License.
  */
 
+import org.apache.helix.msdcommon.datamodel.MetadataStoreRoutingData;
 import org.apache.helix.zookeeper.api.client.HelixZkClient;
 import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
+import org.apache.helix.zookeeper.impl.client.DedicatedZkClient;
 import org.apache.helix.zookeeper.impl.client.ZkClient;
 
 
@@ -35,17 +37,9 @@ public class DedicatedZkClientFactory extends HelixZkClientFactory {
   @Override
   public RealmAwareZkClient buildZkClient(
       RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfig,
-      RealmAwareZkClient.RealmAwareZkClientConfig clientConfig) {
-    // TODO: Implement the logic
-    // Return an instance of DedicatedZkClient
-    return null;
-  }
-
-  @Override
-  public RealmAwareZkClient buildZkClient(
-      RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfig) {
-    // TODO: Implement the logic
-    return null;
+      RealmAwareZkClient.RealmAwareZkClientConfig clientConfig,
+      MetadataStoreRoutingData metadataStoreRoutingData) {
+    return new DedicatedZkClient(connectionConfig, clientConfig, metadataStoreRoutingData);
   }
 
   private static class SingletonHelper {
@@ -57,8 +51,7 @@ public class DedicatedZkClientFactory extends HelixZkClientFactory {
   }
 
   /**
-   * Build a Dedicated ZkClient based on connection config and client config
-   *
+   * Build a Dedicated ZkClient based on connection config and client config.
    * @param connectionConfig
    * @param clientConfig
    * @return
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/factory/SharedZkClientFactory.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/factory/SharedZkClientFactory.java
index a9b8e33..1801614 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/factory/SharedZkClientFactory.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/factory/SharedZkClientFactory.java
@@ -21,6 +21,7 @@ package org.apache.helix.zookeeper.impl.factory;
 
 import java.util.HashMap;
 
+import org.apache.helix.msdcommon.datamodel.MetadataStoreRoutingData;
 import org.apache.helix.zookeeper.api.client.HelixZkClient;
 import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
 import org.apache.helix.zookeeper.exception.ZkClientException;
@@ -44,7 +45,8 @@ public class SharedZkClientFactory extends HelixZkClientFactory {
   @Override
   public RealmAwareZkClient buildZkClient(
       RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfig,
-      RealmAwareZkClient.RealmAwareZkClientConfig clientConfig) {
+      RealmAwareZkClient.RealmAwareZkClientConfig clientConfig,
+      MetadataStoreRoutingData metadataStoreRoutingData) {
     // TODO: Implement the logic
     // Return an instance of SharedZkClient
     return null;
@@ -52,7 +54,8 @@ public class SharedZkClientFactory extends HelixZkClientFactory {
 
   @Override
   public RealmAwareZkClient buildZkClient(
-      RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfig) {
+      RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfig,
+      MetadataStoreRoutingData metadataStoreRoutingData) {
     // TODO: Implement the logic
     return null;
   }
diff --git a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/ZkTestBase.java b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/ZkTestBase.java
new file mode 100644
index 0000000..10edaf4
--- /dev/null
+++ b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/ZkTestBase.java
@@ -0,0 +1,148 @@
+package org.apache.helix.zookeeper.impl;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.management.MBeanServerConnection;
+import javax.management.ObjectName;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.helix.zookeeper.zkclient.IDefaultNameSpace;
+import org.apache.helix.zookeeper.zkclient.ZkServer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterSuite;
+import org.testng.annotations.BeforeSuite;
+
+
+/**
+ * Test base class for various integration tests with an in-memory ZooKeeper.
+ */
+public class ZkTestBase {
+  private static final Logger LOG = LoggerFactory.getLogger(ZkTestBase.class);
+  private static final MBeanServerConnection MBEAN_SERVER =
+      ManagementFactory.getPlatformMBeanServer();
+
+  // maven surefire-plugin's multiple ZK config keys
+  private static final String MULTI_ZK_PROPERTY_KEY = "multiZk";
+  private static final String NUM_ZK_PROPERTY_KEY = "numZk";
+
+  protected static final String ZK_PREFIX = "localhost:";
+  protected static final int ZK_START_PORT = 2127;
+
+  /*
+   * Multiple ZK references
+   */
+  // The following maps hold ZK connect string as keys
+  protected Map<String, ZkServer> _zkServerMap = new HashMap<>();
+  protected int _numZk = 1; // Initial value
+
+  @BeforeSuite
+  public void beforeSuite()
+      throws IOException {
+    // Due to ZOOKEEPER-2693 fix, we need to specify whitelist for execute zk commends
+    System.setProperty("zookeeper.4lw.commands.whitelist", "*");
+
+    // Set up in-memory ZooKeepers
+    setupZooKeepers();
+
+    // Clean up all JMX objects
+    for (ObjectName mbean : MBEAN_SERVER.queryNames(null, null)) {
+      try {
+        MBEAN_SERVER.unregisterMBean(mbean);
+      } catch (Exception e) {
+        // OK
+      }
+    }
+  }
+
+  @AfterSuite
+  public void afterSuite()
+      throws IOException {
+    // Clean up all JMX objects
+    for (ObjectName mbean : MBEAN_SERVER.queryNames(null, null)) {
+      try {
+        MBEAN_SERVER.unregisterMBean(mbean);
+      } catch (Exception e) {
+        // OK
+      }
+    }
+
+    // Shut down all ZkServers
+    _zkServerMap.values().forEach(ZkServer::shutdown);
+  }
+
+  private void setupZooKeepers() {
+    // If multi-ZooKeeper is enabled, start more ZKs. Otherwise, just set up one ZK
+    String multiZkConfig = System.getProperty(MULTI_ZK_PROPERTY_KEY);
+    if (multiZkConfig != null && multiZkConfig.equalsIgnoreCase(Boolean.TRUE.toString())) {
+      String numZkFromConfig = System.getProperty(NUM_ZK_PROPERTY_KEY);
+      if (numZkFromConfig != null) {
+        try {
+          _numZk = Math.max(Integer.parseInt(numZkFromConfig), _numZk);
+        } catch (Exception e) {
+          Assert.fail("Failed to parse the number of ZKs from config!");
+        }
+      } else {
+        Assert.fail("multiZk config is set but numZk config is missing!");
+      }
+    }
+
+    // Start "numZkFromConfigInt" ZooKeepers
+    for (int i = 0; i < _numZk; i++) {
+      String zkAddress = ZK_PREFIX + (ZK_START_PORT + i);
+      ZkServer zkServer = startZkServer(zkAddress);
+      _zkServerMap.put(zkAddress, zkServer);
+    }
+  }
+
+  /**
+   * Creates an in-memory ZK at the given ZK address.
+   * @param zkAddress
+   * @return
+   */
+  private ZkServer startZkServer(final String zkAddress) {
+    String zkDir = zkAddress.replace(':', '_');
+    final String logDir = "/tmp/" + zkDir + "/logs";
+    final String dataDir = "/tmp/" + zkDir + "/dataDir";
+
+    // Clean up local directory
+    try {
+      FileUtils.deleteDirectory(new File(dataDir));
+      FileUtils.deleteDirectory(new File(logDir));
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+
+    IDefaultNameSpace defaultNameSpace = zkClient -> {
+    };
+
+    int port = Integer.parseInt(zkAddress.substring(zkAddress.lastIndexOf(':') + 1));
+    ZkServer zkServer = new ZkServer(dataDir, logDir, defaultNameSpace, port);
+    zkServer.start();
+    return zkServer;
+  }
+}
diff --git a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/RealmAwareZkClientTestBase.java b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/RealmAwareZkClientTestBase.java
new file mode 100644
index 0000000..cd74975
--- /dev/null
+++ b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/RealmAwareZkClientTestBase.java
@@ -0,0 +1,163 @@
+package org.apache.helix.zookeeper.impl.client;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.msdcommon.datamodel.MetadataStoreRoutingData;
+import org.apache.helix.msdcommon.datamodel.TrieRoutingData;
+import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
+import org.apache.helix.zookeeper.api.factory.RealmAwareZkClientFactory;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer;
+import org.apache.helix.zookeeper.impl.ZkTestBase;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public abstract class RealmAwareZkClientTestBase extends ZkTestBase {
+  private static final String ZK_SHARDING_KEY_PREFIX = "/TEST_SHARDING_KEY";
+  private static final String TEST_VALID_PATH = ZK_SHARDING_KEY_PREFIX + "_" + 0 + "/a/b/c";
+  private static final String TEST_INVALID_PATH = ZK_SHARDING_KEY_PREFIX + "_invalid" + "/a/b/c";
+
+  // <Realm, List of sharding keys> Mapping
+  private static final Map<String, List<String>> RAW_ROUTING_DATA = new HashMap<>();
+
+  // The following RealmAwareZkClientFactory is to be defined in subclasses
+  protected RealmAwareZkClientFactory _realmAwareZkClientFactory;
+  private RealmAwareZkClient _realmAwareZkClient;
+  private MetadataStoreRoutingData _metadataStoreRoutingData;
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    // Populate RAW_ROUTING_DATA
+    for (int i = 0; i < _numZk; i++) {
+      List<String> shardingKeyList = new ArrayList<>();
+      shardingKeyList.add(ZK_SHARDING_KEY_PREFIX + "_" + i);
+      String realmName = ZK_PREFIX + (ZK_START_PORT + i);
+      RAW_ROUTING_DATA.put(realmName, shardingKeyList);
+    }
+
+    // Feed the raw routing data into TrieRoutingData to construct an in-memory representation of routing information
+    _metadataStoreRoutingData = new TrieRoutingData(RAW_ROUTING_DATA);
+  }
+
+  @AfterClass
+  public void afterClass() {
+    if (_realmAwareZkClient != null && !_realmAwareZkClient.isClosed()) {
+      _realmAwareZkClient.close();
+    }
+  }
+
+  /**
+   * 1. Create a RealmAwareZkClient with a non-existing sharding key (for which there is no valid ZK realm)
+   * -> This should fail with an exception
+   * 2. Create a RealmAwareZkClient with a valid sharding key
+   * -> This should pass
+   */
+  @Test
+  public void testRealmAwareZkClientCreation() {
+    // Create a RealmAwareZkClient
+    String invalidShardingKey = "InvalidShardingKey";
+    RealmAwareZkClient.RealmAwareZkClientConfig clientConfig =
+        new RealmAwareZkClient.RealmAwareZkClientConfig();
+
+    // Create a connection config with the invalid sharding key
+    RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfig =
+        new RealmAwareZkClient.RealmAwareZkConnectionConfig(invalidShardingKey);
+
+    try {
+      _realmAwareZkClient = _realmAwareZkClientFactory
+          .buildZkClient(connectionConfig, clientConfig, _metadataStoreRoutingData);
+      Assert.fail("Should not succeed with an invalid sharding key!");
+    } catch (IllegalArgumentException e) {
+      // Expected
+    }
+
+    // Use a valid sharding key this time around
+    String validShardingKey = ZK_SHARDING_KEY_PREFIX + "_" + 0; // Use TEST_SHARDING_KEY_0
+    connectionConfig = new RealmAwareZkClient.RealmAwareZkConnectionConfig(validShardingKey);
+    _realmAwareZkClient = _realmAwareZkClientFactory
+        .buildZkClient(connectionConfig, clientConfig, _metadataStoreRoutingData);
+  }
+
+  /**
+   * Test the persistent create() call against a valid path and an invalid path.
+   * Valid path is one that belongs to the realm designated by the sharding key.
+   * Invalid path is one that does not belong to the realm designated by the sharding key.
+   */
+  @Test(dependsOnMethods = "testRealmAwareZkClientCreation")
+  public void testRealmAwareZkClientCreatePersistent() {
+    _realmAwareZkClient.setZkSerializer(new ZNRecordSerializer());
+
+    // Create a dummy ZNRecord
+    ZNRecord znRecord = new ZNRecord("DummyRecord");
+    znRecord.setSimpleField("Dummy", "Value");
+
+    // Test writing and reading against the validPath
+    _realmAwareZkClient.createPersistent(TEST_VALID_PATH, true);
+    _realmAwareZkClient.writeData(TEST_VALID_PATH, znRecord);
+    Assert.assertEquals(_realmAwareZkClient.readData(TEST_VALID_PATH), znRecord);
+
+    // Test writing and reading against the invalid path
+    try {
+      _realmAwareZkClient.createPersistent(TEST_INVALID_PATH, true);
+      Assert.fail("Create() should not succeed on an invalid path!");
+    } catch (IllegalArgumentException e) {
+      // Okay - expected
+    }
+  }
+
+  /**
+   * Test that exists() works on valid path and fails on invalid path.
+   */
+  @Test(dependsOnMethods = "testRealmAwareZkClientCreatePersistent")
+  public void testExists() {
+    Assert.assertTrue(_realmAwareZkClient.exists(TEST_VALID_PATH));
+
+    try {
+      _realmAwareZkClient.exists(TEST_INVALID_PATH);
+      Assert.fail("Exists() should not succeed on an invalid path!");
+    } catch (IllegalArgumentException e) {
+      // Okay - expected
+    }
+  }
+
+  /**
+   * Test that delete() works on valid path and fails on invalid path.
+   */
+  @Test(dependsOnMethods = "testExists")
+  public void testDelete() {
+    try {
+      _realmAwareZkClient.delete(TEST_INVALID_PATH);
+      Assert.fail("Exists() should not succeed on an invalid path!");
+    } catch (IllegalArgumentException e) {
+      // Okay - expected
+    }
+
+    Assert.assertTrue(_realmAwareZkClient.delete(TEST_VALID_PATH));
+    Assert.assertFalse(_realmAwareZkClient.exists(TEST_VALID_PATH));
+  }
+}
\ No newline at end of file
diff --git a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestDedicatedZkClient.java b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestDedicatedZkClient.java
new file mode 100644
index 0000000..8cf3f85
--- /dev/null
+++ b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestDedicatedZkClient.java
@@ -0,0 +1,35 @@
+package org.apache.helix.zookeeper.impl.client;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.zookeeper.impl.factory.DedicatedZkClientFactory;
+import org.testng.annotations.BeforeClass;
+
+
+public class TestDedicatedZkClient extends RealmAwareZkClientTestBase {
+
+  @BeforeClass
+  public void beforeClass()
+      throws Exception {
+    super.beforeClass();
+    // Set the factory to DedicatedZkClientFactory
+    _realmAwareZkClientFactory = DedicatedZkClientFactory.getInstance();
+  }
+}