You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by hu...@apache.org on 2020/04/08 22:53:59 UTC

[helix] 29/50: Add FederatedZkClient (#789)

This is an automated email from the ASF dual-hosted git repository.

hulee pushed a commit to branch zooscalability_merge
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 4d922a5f31e052d33fb836e81c25262260e40ba7
Author: Huizhi Lu <ih...@gmail.com>
AuthorDate: Sat Feb 29 09:45:23 2020 -0800

    Add FederatedZkClient (#789)
    
    As part of ZkClient API enhancement, we wish to add FederatedZkClient, which is a wrapper of the raw ZkClient, that provides realm-aware access to ZooKeeper.
    FederatedZkClient will internally maintain multiple ZooKeeper sessions connecting to different ZooKeeper realms on an as-needed basis and route requests to the appropriate ZooKeeper based on the ZK path sharding key. Ephemeral node creation is not supported.
---
 .../zookeeper/impl/client/FederatedZkClient.java   | 322 ++++++++++++++++-----
 .../apache/helix/zookeeper/impl/ZkTestBase.java    |  13 +-
 .../impl/client/TestFederatedZkClient.java         | 312 ++++++++++++++++++++
 3 files changed, 573 insertions(+), 74 deletions(-)

diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/FederatedZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/FederatedZkClient.java
index 3925a6d..5f63408 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/FederatedZkClient.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/FederatedZkClient.java
@@ -19,345 +19,533 @@ package org.apache.helix.zookeeper.impl.client;
  * under the License.
  */
 
+import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.helix.msdcommon.datamodel.MetadataStoreRoutingData;
 import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
+import org.apache.helix.zookeeper.impl.factory.DedicatedZkClientFactory;
 import org.apache.helix.zookeeper.zkclient.DataUpdater;
 import org.apache.helix.zookeeper.zkclient.IZkChildListener;
 import org.apache.helix.zookeeper.zkclient.IZkDataListener;
+import org.apache.helix.zookeeper.zkclient.ZkConnection;
 import org.apache.helix.zookeeper.zkclient.callback.ZkAsyncCallbacks;
-import org.apache.helix.zookeeper.zkclient.deprecated.IZkStateListener;
+import org.apache.helix.zookeeper.zkclient.IZkStateListener;
+import org.apache.helix.zookeeper.zkclient.serialize.BasicZkSerializer;
 import org.apache.helix.zookeeper.zkclient.serialize.PathBasedZkSerializer;
 import org.apache.helix.zookeeper.zkclient.serialize.ZkSerializer;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.Op;
 import org.apache.zookeeper.OpResult;
+import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Implements and supports all ZK operations defined in interface {@link RealmAwareZkClient},
+ * except for session-aware operations such as creating ephemeral nodes, for which
+ * an {@link UnsupportedOperationException} will be thrown.
+ * <p>
+ * It acts as a single ZK client but will automatically route read/write/change subscription
+ * requests to the corresponding ZkClient with the help of metadata store directory service.
+ * It could connect to multiple ZK addresses and maintain a {@link ZkClient} for each ZK address.
+ * <p>
+ * Note: each Zk realm has its own event queue to handle listeners. So listeners from different ZK
+ * realms could be handled concurrently because listeners of a ZK realm are handled in its own
+ * queue. The concurrency of listeners should be aware of when implementing listeners for different
+ * ZK realms. The users should use thread-safe data structures if they wish to handle change
+ * callbacks.
+ */
+public class FederatedZkClient implements RealmAwareZkClient {
+  private static final Logger LOG = LoggerFactory.getLogger(FederatedZkClient.class);
 
+  private static final String FEDERATED_ZK_CLIENT = FederatedZkClient.class.getSimpleName();
+  private static final String DEDICATED_ZK_CLIENT_FACTORY =
+      DedicatedZkClientFactory.class.getSimpleName();
+
+  private final MetadataStoreRoutingData _metadataStoreRoutingData;
+  private final RealmAwareZkClient.RealmAwareZkClientConfig _clientConfig;
+
+  // ZK realm -> ZkClient
+  private final Map<String, ZkClient> _zkRealmToZkClientMap;
+
+  private volatile boolean _isClosed;
+  private PathBasedZkSerializer _pathBasedZkSerializer;
+
+  // TODO: support capacity of ZkClient number in one FederatedZkClient and do garbage collection.
+  public FederatedZkClient(RealmAwareZkClient.RealmAwareZkClientConfig clientConfig,
+      MetadataStoreRoutingData metadataStoreRoutingData) {
+    if (metadataStoreRoutingData == null) {
+      throw new IllegalArgumentException("MetadataStoreRoutingData cannot be null!");
+    }
+    if (clientConfig == null) {
+      throw new IllegalArgumentException("Client config cannot be null!");
+    }
+
+    _isClosed = false;
+    _clientConfig = clientConfig;
+    _pathBasedZkSerializer = clientConfig.getZkSerializer();
+    _metadataStoreRoutingData = metadataStoreRoutingData;
+    _zkRealmToZkClientMap = new ConcurrentHashMap<>();
+  }
 
-public class FederatedZkClient implements RealmAwareZkClient {
   @Override
   public List<String> subscribeChildChanges(String path, IZkChildListener listener) {
-    return null;
+    return getZkClient(path).subscribeChildChanges(path, listener);
   }
 
   @Override
   public void unsubscribeChildChanges(String path, IZkChildListener listener) {
-
+    getZkClient(path).unsubscribeChildChanges(path, listener);
   }
 
   @Override
   public void subscribeDataChanges(String path, IZkDataListener listener) {
-
+    getZkClient(path).subscribeDataChanges(path, listener);
   }
 
   @Override
   public void unsubscribeDataChanges(String path, IZkDataListener listener) {
-
+    getZkClient(path).unsubscribeDataChanges(path, listener);
   }
 
   @Override
   public void subscribeStateChanges(IZkStateListener listener) {
-
+    throwUnsupportedOperationException();
   }
 
   @Override
   public void unsubscribeStateChanges(IZkStateListener listener) {
+    throwUnsupportedOperationException();
+  }
 
+  @Override
+  public void subscribeStateChanges(
+      org.apache.helix.zookeeper.zkclient.deprecated.IZkStateListener listener) {
+    throwUnsupportedOperationException();
   }
 
   @Override
-  public void unsubscribeAll() {
+  public void unsubscribeStateChanges(
+      org.apache.helix.zookeeper.zkclient.deprecated.IZkStateListener listener) {
+    throwUnsupportedOperationException();
+  }
 
+  @Override
+  public void unsubscribeAll() {
+    _zkRealmToZkClientMap.values().forEach(ZkClient::unsubscribeAll);
   }
 
   @Override
   public void createPersistent(String path) {
-
+    createPersistent(path, false);
   }
 
   @Override
   public void createPersistent(String path, boolean createParents) {
-
+    createPersistent(path, createParents, ZooDefs.Ids.OPEN_ACL_UNSAFE);
   }
 
   @Override
   public void createPersistent(String path, boolean createParents, List<ACL> acl) {
-
+    getZkClient(path).createPersistent(path, createParents, acl);
   }
 
   @Override
   public void createPersistent(String path, Object data) {
-
+    create(path, data, CreateMode.PERSISTENT);
   }
 
   @Override
   public void createPersistent(String path, Object data, List<ACL> acl) {
-
+    create(path, data, acl, CreateMode.PERSISTENT);
   }
 
   @Override
   public String createPersistentSequential(String path, Object data) {
-    return null;
+    return create(path, data, CreateMode.PERSISTENT_SEQUENTIAL);
   }
 
   @Override
   public String createPersistentSequential(String path, Object data, List<ACL> acl) {
-    return null;
+    return create(path, data, acl, CreateMode.PERSISTENT_SEQUENTIAL);
   }
 
   @Override
   public void createEphemeral(String path) {
-
+    create(path, null, CreateMode.EPHEMERAL);
   }
 
   @Override
   public void createEphemeral(String path, String sessionId) {
-
+    createEphemeral(path, null, sessionId);
   }
 
   @Override
   public void createEphemeral(String path, List<ACL> acl) {
-
+    create(path, null, acl, CreateMode.EPHEMERAL);
   }
 
   @Override
   public void createEphemeral(String path, List<ACL> acl, String sessionId) {
-
+    create(path, null, acl, CreateMode.EPHEMERAL, sessionId);
   }
 
   @Override
   public String create(String path, Object data, CreateMode mode) {
-    return null;
+    return create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, mode);
   }
 
   @Override
-  public String create(String path, Object datat, List<ACL> acl, CreateMode mode) {
-    return null;
+  public String create(String path, Object data, List<ACL> acl, CreateMode mode) {
+    return create(path, data, acl, mode, null);
   }
 
   @Override
   public void createEphemeral(String path, Object data) {
-
+    create(path, data, CreateMode.EPHEMERAL);
   }
 
   @Override
   public void createEphemeral(String path, Object data, String sessionId) {
-
+    create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, sessionId);
   }
 
   @Override
   public void createEphemeral(String path, Object data, List<ACL> acl) {
-
+    create(path, data, acl, CreateMode.EPHEMERAL);
   }
 
   @Override
   public void createEphemeral(String path, Object data, List<ACL> acl, String sessionId) {
-
+    create(path, data, acl, CreateMode.EPHEMERAL, sessionId);
   }
 
   @Override
   public String createEphemeralSequential(String path, Object data) {
-    return null;
+    return create(path, data, CreateMode.EPHEMERAL_SEQUENTIAL);
   }
 
   @Override
   public String createEphemeralSequential(String path, Object data, List<ACL> acl) {
-    return null;
+    return create(path, data, acl, CreateMode.EPHEMERAL_SEQUENTIAL);
   }
 
   @Override
   public String createEphemeralSequential(String path, Object data, String sessionId) {
-    return null;
+    return create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL,
+        sessionId);
   }
 
   @Override
   public String createEphemeralSequential(String path, Object data, List<ACL> acl,
       String sessionId) {
-    return null;
+    return create(path, data, acl, CreateMode.EPHEMERAL_SEQUENTIAL, sessionId);
   }
 
   @Override
   public List<String> getChildren(String path) {
-    return null;
+    return getZkClient(path).getChildren(path);
   }
 
   @Override
   public int countChildren(String path) {
-    return 0;
+    return getZkClient(path).countChildren(path);
   }
 
   @Override
   public boolean exists(String path) {
-    return false;
+    return getZkClient(path).exists(path);
   }
 
   @Override
   public Stat getStat(String path) {
-    return null;
+    return getZkClient(path).getStat(path);
   }
 
   @Override
   public boolean waitUntilExists(String path, TimeUnit timeUnit, long time) {
-    return false;
+    return getZkClient(path).waitUntilExists(path, timeUnit, time);
   }
 
   @Override
   public void deleteRecursively(String path) {
-
+    getZkClient(path).deleteRecursively(path);
   }
 
   @Override
   public boolean delete(String path) {
-    return false;
+    return getZkClient(path).delete(path);
   }
 
   @Override
+  @SuppressWarnings("unchecked")
   public <T> T readData(String path) {
-    return null;
+    return (T) readData(path, false);
   }
 
   @Override
   public <T> T readData(String path, boolean returnNullIfPathNotExists) {
-    return null;
+    return getZkClient(path).readData(path, returnNullIfPathNotExists);
   }
 
   @Override
   public <T> T readData(String path, Stat stat) {
-    return null;
+    return getZkClient(path).readData(path, stat);
   }
 
   @Override
   public <T> T readData(String path, Stat stat, boolean watch) {
-    return null;
+    return getZkClient(path).readData(path, stat, watch);
   }
 
   @Override
   public <T> T readDataAndStat(String path, Stat stat, boolean returnNullIfPathNotExists) {
-    return null;
+    return getZkClient(path).readData(path, stat, returnNullIfPathNotExists);
   }
 
   @Override
   public void writeData(String path, Object object) {
-
+    writeData(path, object, -1);
   }
 
   @Override
   public <T> void updateDataSerialized(String path, DataUpdater<T> updater) {
-
+    getZkClient(path).updateDataSerialized(path, updater);
   }
 
   @Override
-  public void writeData(String path, Object datat, int expectedVersion) {
-
+  public void writeData(String path, Object data, int expectedVersion) {
+    writeDataReturnStat(path, data, expectedVersion);
   }
 
   @Override
-  public Stat writeDataReturnStat(String path, Object datat, int expectedVersion) {
-    return null;
+  public Stat writeDataReturnStat(String path, Object data, int expectedVersion) {
+    return getZkClient(path).writeDataReturnStat(path, data, expectedVersion);
   }
 
   @Override
-  public Stat writeDataGetStat(String path, Object datat, int expectedVersion) {
-    return null;
+  public Stat writeDataGetStat(String path, Object data, int expectedVersion) {
+    return writeDataReturnStat(path, data, expectedVersion);
   }
 
   @Override
-  public void asyncCreate(String path, Object datat, CreateMode mode,
+  public void asyncCreate(String path, Object data, CreateMode mode,
       ZkAsyncCallbacks.CreateCallbackHandler cb) {
-
+    getZkClient(path).asyncCreate(path, data, mode, cb);
   }
 
   @Override
-  public void asyncSetData(String path, Object datat, int version,
+  public void asyncSetData(String path, Object data, int version,
       ZkAsyncCallbacks.SetDataCallbackHandler cb) {
-
+    getZkClient(path).asyncSetData(path, data, version, cb);
   }
 
   @Override
   public void asyncGetData(String path, ZkAsyncCallbacks.GetDataCallbackHandler cb) {
-
+    getZkClient(path).asyncGetData(path, cb);
   }
 
   @Override
   public void asyncExists(String path, ZkAsyncCallbacks.ExistsCallbackHandler cb) {
-
+    getZkClient(path).asyncExists(path, cb);
   }
 
   @Override
   public void asyncDelete(String path, ZkAsyncCallbacks.DeleteCallbackHandler cb) {
-
+    getZkClient(path).asyncDelete(path, cb);
   }
 
   @Override
   public void watchForData(String path) {
-
+    getZkClient(path).watchForData(path);
   }
 
   @Override
   public List<String> watchForChilds(String path) {
-    return null;
+    return getZkClient(path).watchForChilds(path);
   }
 
   @Override
   public long getCreationTime(String path) {
-    return 0;
+    return getZkClient(path).getCreationTime(path);
   }
 
   @Override
   public List<OpResult> multi(Iterable<Op> ops) {
+    throwUnsupportedOperationException();
     return null;
   }
 
   @Override
   public boolean waitUntilConnected(long time, TimeUnit timeUnit) {
+    throwUnsupportedOperationException();
     return false;
   }
 
   @Override
   public String getServers() {
+    throwUnsupportedOperationException();
     return null;
   }
 
   @Override
   public long getSessionId() {
-    return 0;
+    // Session-aware is unsupported.
+    throwUnsupportedOperationException();
+    return 0L;
   }
 
   @Override
   public void close() {
+    if (isClosed()) {
+      return;
+    }
 
+    _isClosed = true;
+
+    synchronized (_zkRealmToZkClientMap) {
+      Iterator<Map.Entry<String, ZkClient>> iterator = _zkRealmToZkClientMap.entrySet().iterator();
+
+      while (iterator.hasNext()) {
+        Map.Entry<String, ZkClient> entry = iterator.next();
+        String zkRealm = entry.getKey();
+        ZkClient zkClient = entry.getValue();
+
+        // Catch any exception from ZkClient's close() to avoid that there is leakage of
+        // remaining unclosed ZkClient.
+        try {
+          zkClient.close();
+        } catch (Exception e) {
+          LOG.error("Exception thrown when closing ZkClient for ZkRealm: {}!", zkRealm, e);
+        }
+        iterator.remove();
+      }
+    }
+
+    LOG.info("{} is successfully closed.", FEDERATED_ZK_CLIENT);
   }
 
   @Override
   public boolean isClosed() {
-    return false;
+    return _isClosed;
   }
 
   @Override
   public byte[] serialize(Object data, String path) {
-    return new byte[0];
+    return getZkClient(path).serialize(data, path);
   }
 
   @Override
   public <T> T deserialize(byte[] data, String path) {
-    return null;
+    return getZkClient(path).deserialize(data, path);
   }
 
   @Override
   public void setZkSerializer(ZkSerializer zkSerializer) {
-
+    _pathBasedZkSerializer = new BasicZkSerializer(zkSerializer);
+    _zkRealmToZkClientMap.values()
+        .forEach(zkClient -> zkClient.setZkSerializer(_pathBasedZkSerializer));
   }
 
   @Override
   public void setZkSerializer(PathBasedZkSerializer zkSerializer) {
-
+    _pathBasedZkSerializer = zkSerializer;
+    _zkRealmToZkClientMap.values().forEach(zkClient -> zkClient.setZkSerializer(zkSerializer));
   }
 
   @Override
   public PathBasedZkSerializer getZkSerializer() {
-    return null;
+    return _pathBasedZkSerializer;
+  }
+
+  private String create(final String path, final Object dataObject, final List<ACL> acl,
+      final CreateMode mode, final String expectedSessionId) {
+    if (mode.isEphemeral()) {
+      throwUnsupportedOperationException();
+    }
+
+    // Create mode is not session-aware, so the node does not have to be created
+    // by the expectedSessionId.
+    return getZkClient(path).create(path, dataObject, acl, mode);
+  }
+
+  private ZkClient getZkClient(String path) {
+    // If FederatedZkClient is closed, should not return ZkClient.
+    checkClosedState();
+
+    String zkRealm = getZkRealm(path);
+
+    // Use this zkClient reference to protect the returning zkClient from being null because of
+    // race condition. Once we get the reference, even _zkRealmToZkClientMap is cleared by closed(),
+    // this zkClient is not null which guarantees the returned value not null.
+    ZkClient zkClient = _zkRealmToZkClientMap.get(zkRealm);
+
+    if (zkClient == null) {
+      // 1. Synchronized to avoid creating duplicate ZkClient for the same ZkRealm.
+      // 2. Synchronized with close() to avoid creating new ZkClient when all ZkClients are
+      // being closed and _zkRealmToZkClientMap is being cleared.
+      synchronized (_zkRealmToZkClientMap) {
+        // Because of potential race condition: thread B to get ZkClient could be blocked by this
+        // synchronized, while thread A is executing closed() in its synchronized block. So thread B
+        // could still enter this synchronized block once A completes executing closed() and
+        // releases the synchronized lock.
+        // Check closed state again to avoid creating a new ZkClient after FederatedZkClient
+        // is already closed.
+        checkClosedState();
+
+        if (!_zkRealmToZkClientMap.containsKey(zkRealm)) {
+          zkClient = createZkClient(zkRealm);
+          _zkRealmToZkClientMap.put(zkRealm, zkClient);
+        } else {
+          zkClient = _zkRealmToZkClientMap.get(zkRealm);
+        }
+      }
+    }
+
+    return zkClient;
+  }
+
+  private String getZkRealm(String path) {
+    String zkRealm;
+    try {
+      zkRealm = _metadataStoreRoutingData.getMetadataStoreRealm(path);
+    } catch (NoSuchElementException ex) {
+      throw new NoSuchElementException("Cannot find ZK realm for the path: " + path);
+    }
+
+    if (zkRealm == null || zkRealm.isEmpty()) {
+      throw new NoSuchElementException("Cannot find ZK realm for the path: " + path);
+    }
+
+    return zkRealm;
+  }
+
+  private ZkClient createZkClient(String zkAddress) {
+    LOG.debug("Creating ZkClient for realm: {}.", zkAddress);
+    return new ZkClient(new ZkConnection(zkAddress), (int) _clientConfig.getConnectInitTimeout(),
+        _clientConfig.getOperationRetryTimeout(), _pathBasedZkSerializer,
+        _clientConfig.getMonitorType(), _clientConfig.getMonitorKey(),
+        _clientConfig.getMonitorInstanceName(), _clientConfig.isMonitorRootPathOnly());
+  }
+
+  private void checkClosedState() {
+    if (isClosed()) {
+      throw new IllegalStateException(FEDERATED_ZK_CLIENT + " is closed!");
+    }
+  }
+
+  private void throwUnsupportedOperationException() {
+    throw new UnsupportedOperationException(
+        "Session-aware operation is not supported by " + FEDERATED_ZK_CLIENT
+            + ". Instead, please use " + DEDICATED_ZK_CLIENT_FACTORY
+            + " to create a dedicated RealmAwareZkClient for this operation.");
   }
 }
diff --git a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/ZkTestBase.java b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/ZkTestBase.java
index 10edaf4..7e59652 100644
--- a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/ZkTestBase.java
+++ b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/ZkTestBase.java
@@ -57,12 +57,11 @@ public class ZkTestBase {
    * Multiple ZK references
    */
   // The following maps hold ZK connect string as keys
-  protected Map<String, ZkServer> _zkServerMap = new HashMap<>();
-  protected int _numZk = 1; // Initial value
+  protected final Map<String, ZkServer> _zkServerMap = new HashMap<>();
+  protected static int _numZk = 1; // Initial value
 
   @BeforeSuite
-  public void beforeSuite()
-      throws IOException {
+  public void beforeSuite() throws IOException {
     // Due to ZOOKEEPER-2693 fix, we need to specify whitelist for execute zk commends
     System.setProperty("zookeeper.4lw.commands.whitelist", "*");
 
@@ -80,8 +79,7 @@ public class ZkTestBase {
   }
 
   @AfterSuite
-  public void afterSuite()
-      throws IOException {
+  public void afterSuite() throws IOException {
     // Clean up all JMX objects
     for (ObjectName mbean : MBEAN_SERVER.queryNames(null, null)) {
       try {
@@ -124,7 +122,7 @@ public class ZkTestBase {
    * @param zkAddress
    * @return
    */
-  private ZkServer startZkServer(final String zkAddress) {
+  protected ZkServer startZkServer(final String zkAddress) {
     String zkDir = zkAddress.replace(':', '_');
     final String logDir = "/tmp/" + zkDir + "/logs";
     final String dataDir = "/tmp/" + zkDir + "/dataDir";
@@ -142,6 +140,7 @@ public class ZkTestBase {
 
     int port = Integer.parseInt(zkAddress.substring(zkAddress.lastIndexOf(':') + 1));
     ZkServer zkServer = new ZkServer(dataDir, logDir, defaultNameSpace, port);
+    System.out.println("Starting ZK server at " + zkAddress);
     zkServer.start();
     return zkServer;
   }
diff --git a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestFederatedZkClient.java b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestFederatedZkClient.java
new file mode 100644
index 0000000..5801690
--- /dev/null
+++ b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestFederatedZkClient.java
@@ -0,0 +1,312 @@
+package org.apache.helix.zookeeper.impl.client;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.helix.msdcommon.datamodel.TrieRoutingData;
+import org.apache.helix.msdcommon.exception.InvalidRoutingDataException;
+import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer;
+import org.apache.helix.zookeeper.impl.ZkTestBase;
+import org.apache.helix.zookeeper.zkclient.IZkStateListener;
+import org.apache.helix.zookeeper.zkclient.ZkServer;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.Op;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class TestFederatedZkClient extends ZkTestBase {
+  private static final String TEST_SHARDING_KEY_PREFIX = "/test_sharding_key_";
+  private static final String TEST_REALM_ONE_VALID_PATH = TEST_SHARDING_KEY_PREFIX + "1/a/b/c";
+  private static final String TEST_REALM_TWO_VALID_PATH = TEST_SHARDING_KEY_PREFIX + "2/x/y/z";
+  private static final String TEST_INVALID_PATH = TEST_SHARDING_KEY_PREFIX + "invalid/a/b/c";
+  private static final String UNSUPPORTED_OPERATION_MESSAGE =
+      "Session-aware operation is not supported by FederatedZkClient.";
+
+  private RealmAwareZkClient _realmAwareZkClient;
+  // Need to start an extra ZK server for multi-realm test, if only one ZK server is running.
+  private String _extraZkRealm;
+  private ZkServer _extraZkServer;
+
+  @BeforeClass
+  public void beforeClass() throws InvalidRoutingDataException {
+    System.out.println("Starting " + TestFederatedZkClient.class.getSimpleName());
+
+    // Populate rawRoutingData
+    // <Realm, List of sharding keys> Mapping
+    Map<String, List<String>> rawRoutingData = new HashMap<>();
+    for (int i = 0; i < _numZk; i++) {
+      List<String> shardingKeyList = Collections.singletonList(TEST_SHARDING_KEY_PREFIX + (i + 1));
+      String realmName = ZK_PREFIX + (ZK_START_PORT + i);
+      rawRoutingData.put(realmName, shardingKeyList);
+    }
+
+    if (rawRoutingData.size() < 2) {
+      System.out.println("There is only one ZK realm. Starting one more ZK to test multi-realm.");
+      _extraZkRealm = ZK_PREFIX + (ZK_START_PORT + 1);
+      _extraZkServer = startZkServer(_extraZkRealm);
+      // RealmTwo's sharding key: /test_sharding_key_2
+      List<String> shardingKeyList = Collections.singletonList(TEST_SHARDING_KEY_PREFIX + "2");
+      rawRoutingData.put(_extraZkRealm, shardingKeyList);
+    }
+
+    // Feed the raw routing data into TrieRoutingData to construct an in-memory representation
+    // of routing information.
+    _realmAwareZkClient = new FederatedZkClient(new RealmAwareZkClient.RealmAwareZkClientConfig(),
+        new TrieRoutingData(rawRoutingData));
+  }
+
+  @AfterClass
+  public void afterClass() {
+    // Close it as it is created in before class.
+    _realmAwareZkClient.close();
+
+    // Close the extra zk server.
+    if (_extraZkServer != null) {
+      _extraZkServer.shutdown();
+    }
+
+    System.out.println("Ending " + TestFederatedZkClient.class.getSimpleName());
+  }
+
+  /*
+   * Tests that an unsupported operation should throw an UnsupportedOperationException.
+   */
+  @Test
+  public void testUnsupportedOperations() {
+    // Test creating ephemeral.
+    try {
+      _realmAwareZkClient.create(TEST_REALM_ONE_VALID_PATH, "Hello", CreateMode.EPHEMERAL);
+      Assert.fail("Ephemeral node should not be created.");
+    } catch (UnsupportedOperationException ex) {
+      Assert.assertTrue(ex.getMessage().startsWith(UNSUPPORTED_OPERATION_MESSAGE));
+    }
+
+    // Test creating ephemeral sequential.
+    try {
+      _realmAwareZkClient
+          .create(TEST_REALM_ONE_VALID_PATH, "Hello", CreateMode.EPHEMERAL_SEQUENTIAL);
+      Assert.fail("Ephemeral node should not be created.");
+    } catch (UnsupportedOperationException ex) {
+      Assert.assertTrue(ex.getMessage().startsWith(UNSUPPORTED_OPERATION_MESSAGE));
+    }
+
+    List<Op> ops = Arrays.asList(
+        Op.create(TEST_REALM_ONE_VALID_PATH, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
+            CreateMode.PERSISTENT), Op.delete(TEST_REALM_ONE_VALID_PATH, -1));
+    try {
+      _realmAwareZkClient.multi(ops);
+      Assert.fail("multi() should not be supported.");
+    } catch (UnsupportedOperationException ex) {
+      Assert.assertTrue(ex.getMessage().startsWith(UNSUPPORTED_OPERATION_MESSAGE));
+    }
+
+    try {
+      _realmAwareZkClient.getSessionId();
+      Assert.fail("getSessionId() should not be supported.");
+    } catch (UnsupportedOperationException ex) {
+      Assert.assertTrue(ex.getMessage().startsWith(UNSUPPORTED_OPERATION_MESSAGE));
+    }
+
+    try {
+      _realmAwareZkClient.getServers();
+      Assert.fail("getServers() should not be supported.");
+    } catch (UnsupportedOperationException ex) {
+      Assert.assertTrue(ex.getMessage().startsWith(UNSUPPORTED_OPERATION_MESSAGE));
+    }
+
+    try {
+      _realmAwareZkClient.waitUntilConnected(5L, TimeUnit.SECONDS);
+      Assert.fail("getServers() should not be supported.");
+    } catch (UnsupportedOperationException ex) {
+      Assert.assertTrue(ex.getMessage().startsWith(UNSUPPORTED_OPERATION_MESSAGE));
+    }
+
+    // Test state change subscription.
+    IZkStateListener listener = new IZkStateListener() {
+      @Override
+      public void handleStateChanged(Watcher.Event.KeeperState state) {
+        System.out.println("Handle new state: " + state);
+      }
+
+      @Override
+      public void handleNewSession(String sessionId) {
+        System.out.println("Handle new session: " + sessionId);
+      }
+
+      @Override
+      public void handleSessionEstablishmentError(Throwable error) {
+        System.out.println("Handle session establishment error: " + error);
+      }
+    };
+
+    try {
+      _realmAwareZkClient.subscribeStateChanges(listener);
+      Assert.fail("getServers() should not be supported.");
+    } catch (UnsupportedOperationException ex) {
+      Assert.assertTrue(ex.getMessage().startsWith(UNSUPPORTED_OPERATION_MESSAGE));
+    }
+
+    try {
+      _realmAwareZkClient.unsubscribeStateChanges(listener);
+      Assert.fail("getServers() should not be supported.");
+    } catch (UnsupportedOperationException ex) {
+      Assert.assertTrue(ex.getMessage().startsWith(UNSUPPORTED_OPERATION_MESSAGE));
+    }
+  }
+
+  /*
+   * Tests the persistent create() call against a valid path and an invalid path.
+   * Valid path is one that belongs to the realm designated by the sharding key.
+   * Invalid path is one that does not belong to the realm designated by the sharding key.
+   */
+  @Test(dependsOnMethods = "testUnsupportedOperations")
+  public void testCreatePersistent() {
+    _realmAwareZkClient.setZkSerializer(new ZNRecordSerializer());
+
+    // Create a dummy ZNRecord
+    ZNRecord znRecord = new ZNRecord("DummyRecord");
+    znRecord.setSimpleField("Dummy", "Value");
+
+    // Test writing and reading against the validPath
+    _realmAwareZkClient.createPersistent(TEST_REALM_ONE_VALID_PATH, true);
+    _realmAwareZkClient.writeData(TEST_REALM_ONE_VALID_PATH, znRecord);
+    Assert.assertEquals(_realmAwareZkClient.readData(TEST_REALM_ONE_VALID_PATH), znRecord);
+
+    // Test writing and reading against the invalid path
+    try {
+      _realmAwareZkClient.createPersistent(TEST_INVALID_PATH, true);
+      Assert.fail("Create() should not succeed on an invalid path!");
+    } catch (NoSuchElementException ex) {
+      Assert
+          .assertEquals(ex.getMessage(), "Cannot find ZK realm for the path: " + TEST_INVALID_PATH);
+    }
+  }
+
+  /*
+   * Tests that exists() works on valid path and fails on invalid path.
+   */
+  @Test(dependsOnMethods = "testCreatePersistent")
+  public void testExists() {
+    Assert.assertTrue(_realmAwareZkClient.exists(TEST_REALM_ONE_VALID_PATH));
+
+    try {
+      _realmAwareZkClient.exists(TEST_INVALID_PATH);
+      Assert.fail("Exists() should not succeed on an invalid path!");
+    } catch (NoSuchElementException ex) {
+      Assert
+          .assertEquals(ex.getMessage(), "Cannot find ZK realm for the path: " + TEST_INVALID_PATH);
+    }
+  }
+
+  /*
+   * Tests that delete() works on valid path and fails on invalid path.
+   */
+  @Test(dependsOnMethods = "testExists")
+  public void testDelete() {
+    try {
+      _realmAwareZkClient.delete(TEST_INVALID_PATH);
+      Assert.fail("Exists() should not succeed on an invalid path!");
+    } catch (NoSuchElementException ex) {
+      Assert
+          .assertEquals(ex.getMessage(), "Cannot find ZK realm for the path: " + TEST_INVALID_PATH);
+    }
+
+    Assert.assertTrue(_realmAwareZkClient.delete(TEST_REALM_ONE_VALID_PATH));
+    Assert.assertFalse(_realmAwareZkClient.exists(TEST_REALM_ONE_VALID_PATH));
+  }
+
+  /*
+   * Tests that multi-realm feature.
+   */
+  @Test(dependsOnMethods = "testDelete")
+  public void testMultiRealmCRUD() {
+    ZNRecord realmOneZnRecord = new ZNRecord("realmOne");
+    realmOneZnRecord.setSimpleField("realmOne", "Value");
+
+    ZNRecord realmTwoZnRecord = new ZNRecord("realmTwo");
+    realmTwoZnRecord.setSimpleField("realmTwo", "Value");
+
+    // Writing on realmOne.
+    _realmAwareZkClient.createPersistent(TEST_REALM_ONE_VALID_PATH, true);
+    _realmAwareZkClient.writeData(TEST_REALM_ONE_VALID_PATH, realmOneZnRecord);
+
+    // RealmOne path is created but realmTwo path is not.
+    Assert.assertTrue(_realmAwareZkClient.exists(TEST_REALM_ONE_VALID_PATH));
+    Assert.assertFalse(_realmAwareZkClient.exists(TEST_REALM_TWO_VALID_PATH));
+
+    // Writing on realmTwo.
+    _realmAwareZkClient.createPersistent(TEST_REALM_TWO_VALID_PATH, true);
+    _realmAwareZkClient.writeData(TEST_REALM_TWO_VALID_PATH, realmTwoZnRecord);
+
+    // RealmTwo path is created.
+    Assert.assertTrue(_realmAwareZkClient.exists(TEST_REALM_TWO_VALID_PATH));
+
+    // Reading on both realms.
+    Assert.assertEquals(_realmAwareZkClient.readData(TEST_REALM_ONE_VALID_PATH), realmOneZnRecord);
+    Assert.assertEquals(_realmAwareZkClient.readData(TEST_REALM_TWO_VALID_PATH), realmTwoZnRecord);
+
+    Assert.assertTrue(_realmAwareZkClient.delete(TEST_REALM_ONE_VALID_PATH));
+    Assert.assertFalse(_realmAwareZkClient.exists(TEST_REALM_ONE_VALID_PATH));
+
+    // Deleting on realmOne does not delete on realmTwo.
+    Assert.assertTrue(_realmAwareZkClient.exists(TEST_REALM_TWO_VALID_PATH));
+
+    // Deleting on realmTwo.
+    Assert.assertTrue(_realmAwareZkClient.delete(TEST_REALM_TWO_VALID_PATH));
+    Assert.assertFalse(_realmAwareZkClient.exists(TEST_REALM_TWO_VALID_PATH));
+  }
+
+  /*
+   * Tests that close() works.
+   * TODO: test that all raw zkClients are closed after FederatedZkClient close() is called. This
+   *  could help avoid ZkClient leakage.
+   */
+  @Test(dependsOnMethods = "testMultiRealmCRUD")
+  public void testClose() {
+    Assert.assertFalse(_realmAwareZkClient.isClosed());
+
+    _realmAwareZkClient.close();
+
+    Assert.assertTrue(_realmAwareZkClient.isClosed());
+
+    // Client is closed, so operation should not be executed.
+    try {
+      _realmAwareZkClient.createPersistent(TEST_REALM_ONE_VALID_PATH);
+      Assert
+          .fail("createPersistent() should not be executed because RealmAwareZkClient is closed.");
+    } catch (IllegalStateException ex) {
+      Assert.assertEquals(ex.getMessage(), "FederatedZkClient is closed!");
+    }
+  }
+}