You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by hu...@apache.org on 2020/04/08 22:54:09 UTC

[helix] 39/50: Make ZKHelixAdmin and ZKHelixManager Realm-aware (#846)

This is an automated email from the ASF dual-hosted git repository.

hulee pushed a commit to branch zooscalability_merge
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 3c300be5dcfa4bb2b27d3f9c298b9fd56a65b54e
Author: Huizhi Lu <ih...@gmail.com>
AuthorDate: Thu Mar 12 09:44:38 2020 -0700

    Make ZKHelixAdmin and ZKHelixManager Realm-aware (#846)
    
    To make Helix Java APIs realm-aware, we need to make both ZKHelixAdmin and ZKHelixManager realm-aware. This commit adds a Builder to set client config and connection config for building realm-aware ZkClients underneath.
---
 .../java/org/apache/helix/SystemPropertyKeys.java  |   8 +
 .../apache/helix/manager/zk/CallbackHandler.java   |   7 +-
 .../helix/manager/zk/ParticipantManager.java       |   7 +-
 .../org/apache/helix/manager/zk/ZKHelixAdmin.java  | 206 +++++++++++++++++----
 .../apache/helix/manager/zk/ZKHelixManager.java    | 102 +++++++---
 .../test/java/org/apache/helix/ZkTestHelper.java   |  21 ++-
 .../integration/TestResourceGroupEndtoEnd.java     |   3 +-
 .../controller/TestControllerLeadershipChange.java |   4 +-
 .../manager/ClusterControllerManager.java          |   3 +-
 .../manager/ClusterDistributedController.java      |   3 +-
 .../manager/MockParticipantManager.java            |   3 +-
 .../helix/integration/manager/ZkTestManager.java   |   5 +-
 .../apache/helix/manager/zk/TestHandleSession.java |   5 +-
 .../impl/factory/HelixZkClientFactory.java         |   2 +-
 14 files changed, 292 insertions(+), 87 deletions(-)

diff --git a/helix-common/src/main/java/org/apache/helix/SystemPropertyKeys.java b/helix-common/src/main/java/org/apache/helix/SystemPropertyKeys.java
index a40dbe9..3b9d4a4 100644
--- a/helix-common/src/main/java/org/apache/helix/SystemPropertyKeys.java
+++ b/helix-common/src/main/java/org/apache/helix/SystemPropertyKeys.java
@@ -20,6 +20,14 @@ package org.apache.helix;
  */
 
 public class SystemPropertyKeys {
+  // Used to compose default values in HelixManagerProperty
+  public static final String HELIX_MANAGER_PROPERTIES = "helix-manager.properties";
+
+  public static final String HELIX_MANAGER_VERSION = "clustermanager.version";
+
+  // Used to compose default values in HelixCloudProperty when cloud provider is Azure
+  public static final String AZURE_CLOUD_PROPERTIES = "azure-cloud.properties";
+
   // Task Driver
   public static final String TASK_CONFIG_LIMITATION = "helixTask.configsLimitation";
 
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
index 82c9b29..bbb8788 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
@@ -64,6 +64,7 @@ import org.apache.helix.model.Message;
 import org.apache.helix.model.ResourceConfig;
 import org.apache.helix.monitoring.mbeans.HelixCallbackMonitor;
 import org.apache.helix.zookeeper.api.client.HelixZkClient;
+import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.helix.zookeeper.zkclient.IZkChildListener;
 import org.apache.helix.zookeeper.zkclient.IZkDataListener;
@@ -107,7 +108,7 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener {
   private final Set<EventType> _eventTypes;
   private final HelixDataAccessor _accessor;
   private final ChangeType _changeType;
-  private final HelixZkClient _zkClient;
+  private final RealmAwareZkClient _zkClient;
   private final AtomicLong _lastNotificationTimeStamp;
   private final HelixManager _manager;
   private final PropertyKey _propertyKey;
@@ -191,12 +192,12 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener {
    */
   private List<NotificationContext.Type> _expectTypes = nextNotificationType.get(Type.FINALIZE);
 
-  public CallbackHandler(HelixManager manager, HelixZkClient client, PropertyKey propertyKey,
+  public CallbackHandler(HelixManager manager, RealmAwareZkClient client, PropertyKey propertyKey,
       Object listener, EventType[] eventTypes, ChangeType changeType) {
     this(manager, client, propertyKey, listener, eventTypes, changeType, null);
   }
 
-  public CallbackHandler(HelixManager manager, HelixZkClient client, PropertyKey propertyKey,
+  public CallbackHandler(HelixManager manager, RealmAwareZkClient client, PropertyKey propertyKey,
       Object listener, EventType[] eventTypes, ChangeType changeType,
       HelixCallbackMonitor monitor) {
     if (listener == null) {
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
index 9eef47e..8bb54ff 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
@@ -54,6 +54,7 @@ import org.apache.helix.participant.StateMachineEngine;
 import org.apache.helix.participant.statemachine.ScheduledTaskStateModelFactory;
 import org.apache.helix.util.HelixUtil;
 import org.apache.helix.zookeeper.api.client.HelixZkClient;
+import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.helix.zookeeper.datamodel.ZNRecordBucketizer;
 import org.apache.helix.zookeeper.zkclient.DataUpdater;
@@ -70,7 +71,7 @@ public class ParticipantManager {
   private static Logger LOG = LoggerFactory.getLogger(ParticipantManager.class);
   private static final String CLOUD_PROCESSOR_PATH_PREFIX = "org.apache.helix.cloud.";
 
-  final HelixZkClient _zkclient;
+  final RealmAwareZkClient _zkclient;
   final HelixManager _manager;
   final PropertyKey.Builder _keyBuilder;
   final String _clusterName;
@@ -91,14 +92,14 @@ public class ParticipantManager {
   private final String _sessionId;
 
   @Deprecated
-  public ParticipantManager(HelixManager manager, HelixZkClient zkclient, int sessionTimeout,
+  public ParticipantManager(HelixManager manager, RealmAwareZkClient zkclient, int sessionTimeout,
       LiveInstanceInfoProvider liveInstanceInfoProvider, List<PreConnectCallback> preConnectCallbacks,
       final String sessionId) {
     this(manager, zkclient, sessionTimeout, liveInstanceInfoProvider, preConnectCallbacks,
         sessionId, null);
   }
 
-  public ParticipantManager(HelixManager manager, HelixZkClient zkclient, int sessionTimeout,
+  public ParticipantManager(HelixManager manager, RealmAwareZkClient zkclient, int sessionTimeout,
       LiveInstanceInfoProvider liveInstanceInfoProvider,
       List<PreConnectCallback> preConnectCallbacks, final String sessionId,
       HelixManagerProperty helixManagerProperty) {
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
index 22a427b..7beaed8 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
@@ -47,9 +47,9 @@ import org.apache.helix.HelixDefinedState;
 import org.apache.helix.HelixException;
 import org.apache.helix.InstanceType;
 import org.apache.helix.PropertyKey;
-import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.PropertyPathBuilder;
 import org.apache.helix.PropertyType;
+import org.apache.helix.SystemPropertyKeys;
 import org.apache.helix.controller.rebalancer.DelayedAutoRebalancer;
 import org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
 import org.apache.helix.controller.rebalancer.strategy.RebalanceStrategy;
@@ -75,12 +75,14 @@ import org.apache.helix.model.Message.MessageType;
 import org.apache.helix.model.PauseSignal;
 import org.apache.helix.model.ResourceConfig;
 import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.msdcommon.exception.InvalidRoutingDataException;
 import org.apache.helix.tools.DefaultIdealStateCalculator;
 import org.apache.helix.util.HelixUtil;
 import org.apache.helix.util.RebalanceUtil;
 import org.apache.helix.zookeeper.api.client.HelixZkClient;
 import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.helix.zookeeper.impl.client.FederatedZkClient;
 import org.apache.helix.zookeeper.impl.factory.SharedZkClientFactory;
 import org.apache.helix.zookeeper.zkclient.DataUpdater;
 import org.apache.helix.zookeeper.zkclient.exception.ZkException;
@@ -91,18 +93,27 @@ import org.slf4j.LoggerFactory;
 
 
 public class ZKHelixAdmin implements HelixAdmin {
+  private static final Logger LOG = LoggerFactory.getLogger(ZKHelixAdmin.class);
+
   public static final String CONNECTION_TIMEOUT = "helixAdmin.timeOutInSec";
   private static final String MAINTENANCE_ZNODE_ID = "maintenance";
   private static final int DEFAULT_SUPERCLUSTER_REPLICA = 3;
 
   private final RealmAwareZkClient _zkClient;
   private final ConfigAccessor _configAccessor;
-  // true if ZKHelixAdmin was instantiated with a HelixZkClient, false otherwise
+  // true if ZKHelixAdmin was instantiated with a RealmAwareZkClient, false otherwise
   // This is used for close() to determine how ZKHelixAdmin should close the underlying ZkClient
   private final boolean _usesExternalZkClient;
 
   private static Logger logger = LoggerFactory.getLogger(ZKHelixAdmin.class);
 
+  /**
+   * @deprecated it is recommended to use the builder constructor {@link Builder}
+   * instead to avoid having to manually create and maintain a RealmAwareZkClient
+   * outside of ZKHelixAdmin.
+   *
+   * @param zkClient A created RealmAwareZkClient
+   */
   @Deprecated
   public ZKHelixAdmin(RealmAwareZkClient zkClient) {
     _zkClient = zkClient;
@@ -110,14 +121,69 @@ public class ZKHelixAdmin implements HelixAdmin {
     _usesExternalZkClient = true;
   }
 
+  /**
+   * There are 2 realm-aware modes to connect to ZK:
+   * 1. if system property {@link SystemPropertyKeys#MULTI_ZK_ENABLED} is set to <code>"true"</code>,
+   * it will connect on multi-realm mode;
+   * 2. otherwise, it will connect on single-realm mode to the <code>zkAddress</code> provided.
+   *
+   * @param zkAddress ZK address
+   * @exception HelixException if not able to connect on multi-realm mode
+   *
+   * @deprecated it is recommended to use the builder constructor {@link Builder}
+   */
+  @Deprecated
   public ZKHelixAdmin(String zkAddress) {
     int timeOutInSec = Integer.parseInt(System.getProperty(CONNECTION_TIMEOUT, "30"));
-    HelixZkClient.ZkClientConfig clientConfig = new HelixZkClient.ZkClientConfig();
-    clientConfig.setZkSerializer(new ZNRecordSerializer())
-        .setConnectInitTimeout(timeOutInSec * 1000);
-    _zkClient = SharedZkClientFactory.getInstance()
-        .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddress), clientConfig);
-    _zkClient.waitUntilConnected(timeOutInSec, TimeUnit.SECONDS);
+    RealmAwareZkClient.RealmAwareZkClientConfig clientConfig =
+        new RealmAwareZkClient.RealmAwareZkClientConfig()
+            .setConnectInitTimeout(timeOutInSec * 1000L)
+            .setZkSerializer(new ZNRecordSerializer());
+
+    RealmAwareZkClient zkClient;
+
+    if (Boolean.getBoolean(SystemPropertyKeys.MULTI_ZK_ENABLED)) {
+      try {
+        zkClient = new FederatedZkClient(
+            new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder().build(), clientConfig);
+      } catch (IllegalStateException | IOException | InvalidRoutingDataException e) {
+        throw new HelixException("Not able to connect on multi-realm mode.", e);
+      }
+    } else {
+      zkClient = SharedZkClientFactory.getInstance()
+          .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddress),
+              clientConfig.createHelixZkClientConfig());
+      zkClient.waitUntilConnected(timeOutInSec, TimeUnit.SECONDS);
+    }
+
+    _zkClient = zkClient;
+    _configAccessor = new ConfigAccessor(_zkClient);
+    _usesExternalZkClient = false;
+  }
+
+  private ZKHelixAdmin(Builder builder) {
+    RealmAwareZkClient zkClient;
+    switch (builder.realmMode) {
+      case MULTI_REALM:
+        try {
+          zkClient = new FederatedZkClient(builder.realmAwareZkConnectionConfig,
+              builder.realmAwareZkClientConfig);
+        } catch (IOException | InvalidRoutingDataException | IllegalStateException e) {
+          throw new HelixException("Not able to connect on multi-realm mode.", e);
+        }
+        break;
+      case SINGLE_REALM:
+        // Create a HelixZkClient: Use a SharedZkClient because ZKHelixAdmin does not need to do
+        // ephemeral operations
+        zkClient = SharedZkClientFactory.getInstance()
+            .buildZkClient(new HelixZkClient.ZkConnectionConfig(builder.zkAddress),
+                builder.realmAwareZkClientConfig.createHelixZkClientConfig());
+        break;
+      default:
+        throw new HelixException("Invalid RealmMode given: " + builder.realmMode);
+    }
+
+    _zkClient = zkClient;
     _configAccessor = new ConfigAccessor(_zkClient);
     _usesExternalZkClient = false;
   }
@@ -207,7 +273,7 @@ public class ZKHelixAdmin implements HelixAdmin {
 
     HelixDataAccessor accessor =
         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
-    Builder keyBuilder = accessor.keyBuilder();
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
 
     return accessor.getProperty(keyBuilder.instanceConfig(instanceName));
   }
@@ -383,7 +449,7 @@ public class ZKHelixAdmin implements HelixAdmin {
         reason == null ? "NULL" : reason);
     HelixDataAccessor accessor =
         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
-    Builder keyBuilder = accessor.keyBuilder();
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
 
     if (enabled) {
       accessor.removeProperty(keyBuilder.pause());
@@ -408,7 +474,7 @@ public class ZKHelixAdmin implements HelixAdmin {
   public boolean isInMaintenanceMode(String clusterName) {
     HelixDataAccessor accessor =
         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
-    Builder keyBuilder = accessor.keyBuilder();
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
     return accessor.getBaseDataAccessor()
         .exists(keyBuilder.maintenance().getPath(), AccessOption.PERSISTENT);
   }
@@ -449,7 +515,7 @@ public class ZKHelixAdmin implements HelixAdmin {
       final MaintenanceSignal.TriggeringEntity triggeringEntity) {
     HelixDataAccessor accessor =
         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
-    Builder keyBuilder = accessor.keyBuilder();
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
     logger.info("Cluster {} {} {} maintenance mode for reason {}.", clusterName,
         triggeringEntity == MaintenanceSignal.TriggeringEntity.CONTROLLER ? "automatically"
             : "manually", enabled ? "enters" : "exits", reason == null ? "NULL" : reason);
@@ -517,7 +583,7 @@ public class ZKHelixAdmin implements HelixAdmin {
         instanceName, clusterName);
     HelixDataAccessor accessor =
         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
-    Builder keyBuilder = accessor.keyBuilder();
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
 
     // check the instance is alive
     LiveInstance liveInstance = accessor.getProperty(keyBuilder.liveInstance(instanceName));
@@ -635,7 +701,7 @@ public class ZKHelixAdmin implements HelixAdmin {
         instanceNames == null ? "NULL" : HelixUtil.serializeByComma(instanceNames), clusterName);
     HelixDataAccessor accessor =
         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
-    Builder keyBuilder = accessor.keyBuilder();
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
     List<ExternalView> extViews = accessor.getChildValues(keyBuilder.externalViews());
 
     Set<String> resetInstanceNames = new HashSet<String>(instanceNames);
@@ -663,7 +729,7 @@ public class ZKHelixAdmin implements HelixAdmin {
         resourceNames == null ? "NULL" : HelixUtil.serializeByComma(resourceNames), clusterName);
     HelixDataAccessor accessor =
         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
-    Builder keyBuilder = accessor.keyBuilder();
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
     List<ExternalView> extViews = accessor.getChildValues(keyBuilder.externalViews());
 
     Set<String> resetResourceNames = new HashSet<String>(resourceNames);
@@ -791,7 +857,7 @@ public class ZKHelixAdmin implements HelixAdmin {
 
     HelixDataAccessor accessor =
         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
-    Builder keyBuilder = accessor.keyBuilder();
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
 
     for (String instanceName : instances) {
       InstanceConfig config = accessor.getProperty(keyBuilder.instanceConfig(instanceName));
@@ -910,7 +976,7 @@ public class ZKHelixAdmin implements HelixAdmin {
 
     HelixDataAccessor accessor =
         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
-    Builder keyBuilder = accessor.keyBuilder();
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
 
     for (String resourceName : getResourcesInCluster(clusterName)) {
       IdealState is = accessor.getProperty(keyBuilder.idealStates(resourceName));
@@ -926,7 +992,7 @@ public class ZKHelixAdmin implements HelixAdmin {
   public IdealState getResourceIdealState(String clusterName, String resourceName) {
     HelixDataAccessor accessor =
         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
-    Builder keyBuilder = accessor.keyBuilder();
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
 
     return accessor.getProperty(keyBuilder.idealStates(resourceName));
   }
@@ -939,7 +1005,7 @@ public class ZKHelixAdmin implements HelixAdmin {
             clusterName, idealState == null ? "NULL" : idealState.toString());
     HelixDataAccessor accessor =
         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
-    Builder keyBuilder = accessor.keyBuilder();
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
 
     accessor.setProperty(keyBuilder.idealStates(resourceName), idealState);
   }
@@ -982,7 +1048,7 @@ public class ZKHelixAdmin implements HelixAdmin {
   public ExternalView getResourceExternalView(String clusterName, String resourceName) {
     HelixDataAccessor accessor =
         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
-    Builder keyBuilder = accessor.keyBuilder();
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
     return accessor.getProperty(keyBuilder.externalView(resourceName));
   }
 
@@ -1016,7 +1082,7 @@ public class ZKHelixAdmin implements HelixAdmin {
 
     HelixDataAccessor accessor =
         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
-    Builder keyBuilder = accessor.keyBuilder();
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
     accessor.setProperty(keyBuilder.stateModelDef(stateModelDef), stateModel);
   }
 
@@ -1025,7 +1091,7 @@ public class ZKHelixAdmin implements HelixAdmin {
     logger.info("Drop resource {} from cluster {}", resourceName, clusterName);
     HelixDataAccessor accessor =
         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
-    Builder keyBuilder = accessor.keyBuilder();
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
 
     accessor.removeProperty(keyBuilder.idealStates(resourceName));
     accessor.removeProperty(keyBuilder.resourceConfig(resourceName));
@@ -1045,7 +1111,7 @@ public class ZKHelixAdmin implements HelixAdmin {
 
     ZKHelixDataAccessor accessor =
         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
-    Builder keyBuilder = accessor.keyBuilder();
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
     accessor.setProperty(keyBuilder.cloudConfig(), cloudConfigBuilder);
   }
 
@@ -1054,7 +1120,7 @@ public class ZKHelixAdmin implements HelixAdmin {
     logger.info("Remove Cloud Config for cluster {}.", clusterName);
     HelixDataAccessor accessor =
         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
-    Builder keyBuilder = accessor.keyBuilder();
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
     accessor.removeProperty(keyBuilder.cloudConfig());
   }
 
@@ -1067,7 +1133,7 @@ public class ZKHelixAdmin implements HelixAdmin {
   public StateModelDefinition getStateModelDef(String clusterName, String stateModelName) {
     HelixDataAccessor accessor =
         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
-    Builder keyBuilder = accessor.keyBuilder();
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
 
     return accessor.getProperty(keyBuilder.stateModelDef(stateModelName));
   }
@@ -1077,7 +1143,7 @@ public class ZKHelixAdmin implements HelixAdmin {
     logger.info("Deleting cluster {}.", clusterName);
     HelixDataAccessor accessor =
         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
-    Builder keyBuilder = accessor.keyBuilder();
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
 
     String root = "/" + clusterName;
     if (accessor.getChildNames(keyBuilder.liveInstances()).size() > 0) {
@@ -1121,7 +1187,7 @@ public class ZKHelixAdmin implements HelixAdmin {
 
     ZKHelixDataAccessor accessor =
         new ZKHelixDataAccessor(grandCluster, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
-    Builder keyBuilder = accessor.keyBuilder();
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
 
     accessor.setProperty(keyBuilder.idealStates(idealState.getResourceName()), idealState);
   }
@@ -1316,7 +1382,7 @@ public class ZKHelixAdmin implements HelixAdmin {
         constraintId, clusterName);
     BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient);
 
-    Builder keyBuilder = new Builder(clusterName);
+    PropertyKey.Builder keyBuilder = new PropertyKey.Builder(clusterName);
     String path = keyBuilder.constraint(constraintType.toString()).getPath();
 
     baseAccessor.update(path, new DataUpdater<ZNRecord>() {
@@ -1339,7 +1405,7 @@ public class ZKHelixAdmin implements HelixAdmin {
         constraintId, clusterName);
     BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient);
 
-    Builder keyBuilder = new Builder(clusterName);
+    PropertyKey.Builder keyBuilder = new PropertyKey.Builder(clusterName);
     String path = keyBuilder.constraint(constraintType.toString()).getPath();
 
     baseAccessor.update(path, new DataUpdater<ZNRecord>() {
@@ -1361,7 +1427,7 @@ public class ZKHelixAdmin implements HelixAdmin {
     HelixDataAccessor accessor =
         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
 
-    Builder keyBuilder = new Builder(clusterName);
+    PropertyKey.Builder keyBuilder = new PropertyKey.Builder(clusterName);
     return accessor.getProperty(keyBuilder.constraint(constraintType.toString()));
   }
 
@@ -1443,7 +1509,7 @@ public class ZKHelixAdmin implements HelixAdmin {
     }
     HelixDataAccessor accessor =
         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
-    Builder keyBuilder = accessor.keyBuilder();
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
 
     InstanceConfig config = accessor.getProperty(keyBuilder.instanceConfig(instanceName));
     config.addTag(tag);
@@ -1464,7 +1530,7 @@ public class ZKHelixAdmin implements HelixAdmin {
     }
     ZKHelixDataAccessor accessor =
         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
-    Builder keyBuilder = accessor.keyBuilder();
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
 
     InstanceConfig config = accessor.getProperty(keyBuilder.instanceConfig(instanceName));
     config.removeTag(tag);
@@ -1485,7 +1551,7 @@ public class ZKHelixAdmin implements HelixAdmin {
     }
     HelixDataAccessor accessor =
         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
-    Builder keyBuilder = accessor.keyBuilder();
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
 
     InstanceConfig config = accessor.getProperty(keyBuilder.instanceConfig(instanceName));
     config.setZoneId(zoneId);
@@ -1677,7 +1743,7 @@ public class ZKHelixAdmin implements HelixAdmin {
 
     HelixDataAccessor accessor =
         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<>(_zkClient));
-    Builder keyBuilder = accessor.keyBuilder();
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
     List<IdealState> idealStates = accessor.getChildValues(keyBuilder.idealStates());
     List<String> nullIdealStates = new ArrayList<>();
     for (int i = 0; i < idealStates.size(); i++) {
@@ -1718,7 +1784,7 @@ public class ZKHelixAdmin implements HelixAdmin {
     // Ensure that all instances are valid
     HelixDataAccessor accessor =
         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<>(_zkClient));
-    Builder keyBuilder = accessor.keyBuilder();
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
     List<String> instances = accessor.getChildNames(keyBuilder.instanceConfigs());
     if (validateInstancesForWagedRebalance(clusterName, instances).containsValue(false)) {
       throw new HelixException(String
@@ -1828,4 +1894,74 @@ public class ZKHelixAdmin implements HelixAdmin {
             clusterConfig));
     return true;
   }
+
+  // TODO: refactor builder to reduce duplicate code with other Helix Java APIs
+  public static class Builder {
+    private String zkAddress;
+    private RealmAwareZkClient.RealmMode realmMode;
+    private RealmAwareZkClient.RealmAwareZkConnectionConfig realmAwareZkConnectionConfig;
+    private RealmAwareZkClient.RealmAwareZkClientConfig realmAwareZkClientConfig;
+
+    public Builder() {
+    }
+
+    public ZKHelixAdmin.Builder setZkAddress(String zkAddress) {
+      this.zkAddress = zkAddress;
+      return this;
+    }
+
+    public ZKHelixAdmin.Builder setRealmMode(RealmAwareZkClient.RealmMode realmMode) {
+      this.realmMode = realmMode;
+      return this;
+    }
+
+    public ZKHelixAdmin.Builder setRealmAwareZkConnectionConfig(
+        RealmAwareZkClient.RealmAwareZkConnectionConfig realmAwareZkConnectionConfig) {
+      realmAwareZkConnectionConfig = realmAwareZkConnectionConfig;
+      return this;
+    }
+
+    public ZKHelixAdmin.Builder setRealmAwareZkClientConfig(
+        RealmAwareZkClient.RealmAwareZkClientConfig realmAwareZkClientConfig) {
+      realmAwareZkClientConfig = realmAwareZkClientConfig;
+      return this;
+    }
+
+    public ZKHelixAdmin build() {
+      validate();
+      return new ZKHelixAdmin(this);
+    }
+
+    /*
+     * Validates the given parameters before creating an instance of ZKHelixAdmin.
+     */
+    private void validate() {
+      // Resolve RealmMode based on other parameters
+      boolean isZkAddressSet = zkAddress != null && !zkAddress.isEmpty();
+      if (realmMode == RealmAwareZkClient.RealmMode.SINGLE_REALM && !isZkAddressSet) {
+        throw new HelixException(
+            "RealmMode cannot be single-realm without a valid ZkAddress set!");
+      }
+      if (realmMode == RealmAwareZkClient.RealmMode.MULTI_REALM && isZkAddressSet) {
+        throw new HelixException(
+            "ZkAddress cannot be set on multi-realm mode!");
+      }
+      if (realmMode == null) {
+        realmMode = isZkAddressSet ? RealmAwareZkClient.RealmMode.SINGLE_REALM
+            : RealmAwareZkClient.RealmMode.MULTI_REALM;
+      }
+
+      // Resolve RealmAwareZkClientConfig
+      if (realmAwareZkClientConfig == null) {
+        realmAwareZkClientConfig = new RealmAwareZkClient.RealmAwareZkClientConfig();
+      }
+
+      // Resolve RealmAwareZkConnectionConfig
+      if (realmAwareZkConnectionConfig == null) {
+        // If not set, create a default one
+        realmAwareZkConnectionConfig =
+            new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder().build();
+      }
+    }
+  }
 }
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
index 789e922..72ce07f 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
@@ -19,6 +19,7 @@ package org.apache.helix.manager.zk;
  * under the License.
  */
 
+import java.io.IOException;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
@@ -75,14 +76,17 @@ import org.apache.helix.model.LiveInstance;
 import org.apache.helix.monitoring.ZKPathDataDumpTask;
 import org.apache.helix.monitoring.mbeans.HelixCallbackMonitor;
 import org.apache.helix.monitoring.mbeans.MonitorLevel;
+import org.apache.helix.msdcommon.exception.InvalidRoutingDataException;
 import org.apache.helix.participant.HelixStateMachineEngine;
 import org.apache.helix.participant.StateMachineEngine;
 import org.apache.helix.store.zk.AutoFallbackPropertyStore;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.helix.util.HelixUtil;
 import org.apache.helix.zookeeper.api.client.HelixZkClient;
+import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.helix.zookeeper.impl.factory.DedicatedZkClientFactory;
+import org.apache.helix.zookeeper.impl.factory.HelixZkClientFactory;
 import org.apache.helix.zookeeper.impl.factory.SharedZkClientFactory;
 import org.apache.helix.zookeeper.zkclient.IZkStateListener;
 import org.apache.helix.zookeeper.zkclient.exception.ZkInterruptedException;
@@ -120,7 +124,7 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
   private final String _version;
   private int _reportLatency;
 
-  protected HelixZkClient _zkclient = null;
+  protected RealmAwareZkClient _zkclient;
   private final DefaultMessagingService _messagingService;
   private Map<ChangeType, HelixCallbackMonitor> _callbackMonitors;
 
@@ -663,30 +667,7 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
   }
 
   void createClient() throws Exception {
-    PathBasedZkSerializer zkSerializer =
-        ChainedPathZkSerializer.builder(new ZNRecordSerializer()).build();
-
-    HelixZkClient.ZkConnectionConfig connectionConfig = new HelixZkClient.ZkConnectionConfig(_zkAddress);
-    connectionConfig.setSessionTimeout(_sessionTimeout);
-    HelixZkClient.ZkClientConfig clientConfig = new HelixZkClient.ZkClientConfig();
-    clientConfig
-        .setZkSerializer(zkSerializer)
-        .setConnectInitTimeout(_connectionInitTimeout)
-        .setMonitorType(_instanceType.name())
-        .setMonitorKey(_clusterName)
-        .setMonitorInstanceName(_instanceName)
-        .setMonitorRootPathOnly(isMonitorRootPathOnly());
-
-    HelixZkClient newClient;
-    switch (_instanceType) {
-    case ADMINISTRATOR:
-      newClient = SharedZkClientFactory.getInstance().buildZkClient(connectionConfig, clientConfig);
-      break;
-    default:
-      newClient = DedicatedZkClientFactory
-          .getInstance().buildZkClient(connectionConfig, clientConfig);
-      break;
-    }
+    final RealmAwareZkClient newClient = createSingleRealmZkClient();
 
     synchronized (this) {
       if (_zkclient != null) {
@@ -1296,4 +1277,75 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
   public Long getSessionStartTime() {
     return _sessionStartTime;
   }
+
+  /*
+   * Prepares connection config and client config based on the internal parameters given to
+   * HelixManager in order to create a ZkClient instance to use. Note that a shared ZkClient
+   * instance will be created if connecting as an ADMINISTRATOR to minimize the cost of creating
+   * ZkConnections.
+   */
+  private RealmAwareZkClient createSingleRealmZkClient() {
+    final String shardingKey = buildShardingKey();
+    PathBasedZkSerializer zkSerializer =
+        ChainedPathZkSerializer.builder(new ZNRecordSerializer()).build();
+
+    RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfig =
+        new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder()
+            .setRealmMode(RealmAwareZkClient.RealmMode.SINGLE_REALM)
+            .setZkRealmShardingKey(shardingKey)
+            .setSessionTimeout(_sessionTimeout).build();
+
+    RealmAwareZkClient.RealmAwareZkClientConfig clientConfig =
+        new RealmAwareZkClient.RealmAwareZkClientConfig();
+
+    clientConfig.setZkSerializer(zkSerializer)
+        .setConnectInitTimeout(_connectionInitTimeout)
+        .setMonitorType(_instanceType.name())
+        .setMonitorKey(_clusterName)
+        .setMonitorInstanceName(_instanceName)
+        .setMonitorRootPathOnly(isMonitorRootPathOnly());
+
+    if (_instanceType == InstanceType.ADMINISTRATOR) {
+      return resolveZkClient(SharedZkClientFactory.getInstance(), connectionConfig,
+          clientConfig);
+    }
+
+    return resolveZkClient(DedicatedZkClientFactory.getInstance(), connectionConfig,
+        clientConfig);
+  }
+
+  /*
+   * Resolves what type of ZkClient this HelixManager should use based on whether MULTI_ZK_ENABLED
+   * System config is set or not. Two types of ZkClients are available:
+   * 1) If MULTI_ZK_ENABLED is set to true, we create a dedicated RealmAwareZkClient
+   * that provides full ZkClient functionalities and connects to the correct ZK by querying
+   * MetadataStoreDirectoryService.
+   * 2) Otherwise, we create a dedicated HelixZkClient which plainly connects to
+   * the ZK address given.
+   */
+  private RealmAwareZkClient resolveZkClient(HelixZkClientFactory zkClientFactory,
+      RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfig,
+      RealmAwareZkClient.RealmAwareZkClientConfig clientConfig) {
+    if (Boolean.getBoolean(SystemPropertyKeys.MULTI_ZK_ENABLED)) {
+      try {
+        // Create realm-aware ZkClient.
+        return zkClientFactory.buildZkClient(connectionConfig, clientConfig);
+      } catch (IllegalArgumentException | IOException | InvalidRoutingDataException e) {
+        throw new HelixException("Not able to connect on realm-aware mode for sharding key: "
+            + connectionConfig.getZkRealmShardingKey(), e);
+      }
+    }
+
+    // If multi-zk mode is not enabled, create HelixZkClient with the provided zk address.
+    HelixZkClient.ZkClientConfig helixZkClientConfig = clientConfig.createHelixZkClientConfig();
+    HelixZkClient.ZkConnectionConfig helixZkConnectionConfig =
+        new HelixZkClient.ZkConnectionConfig(_zkAddress)
+            .setSessionTimeout(connectionConfig.getSessionTimeout());
+
+    return zkClientFactory.buildZkClient(helixZkConnectionConfig, helixZkClientConfig);
+  }
+
+  private String buildShardingKey() {
+    return _clusterName.charAt(0) == '/' ? _clusterName : "/" + _clusterName;
+  }
 }
diff --git a/helix-core/src/test/java/org/apache/helix/ZkTestHelper.java b/helix-core/src/test/java/org/apache/helix/ZkTestHelper.java
index c2b3d35..73dded4 100644
--- a/helix-core/src/test/java/org/apache/helix/ZkTestHelper.java
+++ b/helix-core/src/test/java/org/apache/helix/ZkTestHelper.java
@@ -43,6 +43,7 @@ import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
 import org.apache.helix.zookeeper.api.client.HelixZkClient;
 import org.apache.helix.model.ExternalView;
+import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
 import org.apache.helix.zookeeper.zkclient.IZkChildListener;
 import org.apache.helix.zookeeper.zkclient.IZkDataListener;
 import org.apache.helix.zookeeper.zkclient.IZkStateListener;
@@ -71,7 +72,7 @@ public class ZkTestHelper {
   /**
    * Simulate a zk state change by calling {@link ZkClient#process(WatchedEvent)} directly
    */
-  public static void simulateZkStateReconnected(HelixZkClient client) {
+  public static void simulateZkStateReconnected(RealmAwareZkClient client) {
     ZkClient zkClient = (ZkClient) client;
     WatchedEvent event = new WatchedEvent(EventType.None, KeeperState.Disconnected, null);
     zkClient.process(event);
@@ -84,7 +85,7 @@ public class ZkTestHelper {
    * @param client
    * @return
    */
-  public static String getSessionId(HelixZkClient client) {
+  public static String getSessionId(RealmAwareZkClient client) {
     ZkConnection connection = (ZkConnection) ((ZkClient) client).getConnection();
     ZooKeeper curZookeeper = connection.getZookeeper();
     return Long.toHexString(curZookeeper.getSessionId());
@@ -146,7 +147,7 @@ public class ZkTestHelper {
     LOG.info("After expiry. sessionId: " + Long.toHexString(curZookeeper.getSessionId()));
   }
 
-  public static void expireSession(HelixZkClient client) throws Exception {
+  public static void expireSession(RealmAwareZkClient client) throws Exception {
     final CountDownLatch waitNewSession = new CountDownLatch(1);
     final ZkClient zkClient = (ZkClient) client;
 
@@ -213,7 +214,7 @@ public class ZkTestHelper {
    * @param client
    * @throws Exception
    */
-  public static void asyncExpireSession(HelixZkClient client) throws Exception {
+  public static void asyncExpireSession(RealmAwareZkClient client) throws Exception {
     final ZkClient zkClient = (ZkClient) client;
     ZkConnection connection = ((ZkConnection) zkClient.getConnection());
     ZooKeeper curZookeeper = connection.getZookeeper();
@@ -245,7 +246,7 @@ public class ZkTestHelper {
   /*
    * stateMap: partition->instance->state
    */
-  public static boolean verifyState(HelixZkClient zkclient, String clusterName, String resourceName,
+  public static boolean verifyState(RealmAwareZkClient zkclient, String clusterName, String resourceName,
       Map<String, Map<String, String>> expectStateMap, String op) {
     boolean result = true;
     ZkBaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(zkclient);
@@ -391,7 +392,7 @@ public class ZkTestHelper {
     }
   }
 
-  public static Map<String, List<String>> getZkWatch(HelixZkClient client) throws Exception {
+  public static Map<String, List<String>> getZkWatch(RealmAwareZkClient client) throws Exception {
     Map<String, List<String>> lists = new HashMap<String, List<String>>();
     ZkClient zkClient = (ZkClient) client;
 
@@ -424,7 +425,7 @@ public class ZkTestHelper {
     return lists;
   }
 
-  public static Map<String, Set<IZkDataListener>> getZkDataListener(HelixZkClient client)
+  public static Map<String, Set<IZkDataListener>> getZkDataListener(RealmAwareZkClient client)
       throws Exception {
     java.lang.reflect.Field field = getField(client.getClass(), "_dataListener");
     field.setAccessible(true);
@@ -433,7 +434,7 @@ public class ZkTestHelper {
     return dataListener;
   }
 
-  public static Map<String, Set<IZkChildListener>> getZkChildListener(HelixZkClient client)
+  public static Map<String, Set<IZkChildListener>> getZkChildListener(RealmAwareZkClient client)
       throws Exception {
     java.lang.reflect.Field field = getField(client.getClass(), "_childListener");
     field.setAccessible(true);
@@ -442,7 +443,7 @@ public class ZkTestHelper {
     return childListener;
   }
 
-  public static boolean tryWaitZkEventsCleaned(HelixZkClient zkclient) throws Exception {
+  public static boolean tryWaitZkEventsCleaned(RealmAwareZkClient zkclient) throws Exception {
     java.lang.reflect.Field field = getField(zkclient.getClass(), "_eventThread");
     field.setAccessible(true);
     Object eventThread = field.get(zkclient);
@@ -468,7 +469,7 @@ public class ZkTestHelper {
     return false;
   }
 
-  public static void injectExpire(HelixZkClient client)
+  public static void injectExpire(RealmAwareZkClient client)
       throws ExecutionException, InterruptedException {
     final ZkClient zkClient = (ZkClient) client;
     Future future = _executor.submit(new Runnable() {
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestResourceGroupEndtoEnd.java b/helix-core/src/test/java/org/apache/helix/integration/TestResourceGroupEndtoEnd.java
index 0aa5787..0313a77 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestResourceGroupEndtoEnd.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestResourceGroupEndtoEnd.java
@@ -47,6 +47,7 @@ import org.apache.helix.participant.statemachine.StateModel;
 import org.apache.helix.participant.statemachine.StateModelFactory;
 import org.apache.helix.spectator.RoutingTableProvider;
 import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
@@ -427,7 +428,7 @@ public class TestResourceGroupEndtoEnd extends ZkTestBase {
     }
 
     @Override
-    public HelixZkClient getZkClient() {
+    public RealmAwareZkClient getZkClient() {
       return _zkclient;
     }
 
diff --git a/helix-core/src/test/java/org/apache/helix/integration/controller/TestControllerLeadershipChange.java b/helix-core/src/test/java/org/apache/helix/integration/controller/TestControllerLeadershipChange.java
index 8a1ff03..5aa9a91 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/controller/TestControllerLeadershipChange.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/controller/TestControllerLeadershipChange.java
@@ -34,12 +34,12 @@ import org.apache.helix.common.ZkTestBase;
 import org.apache.helix.integration.manager.ClusterControllerManager;
 import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.manager.zk.CallbackHandler;
-import org.apache.helix.zookeeper.api.client.HelixZkClient;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.LiveInstance;
 import org.apache.helix.monitoring.mbeans.MonitorDomainNames;
 import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
 import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
+import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
@@ -148,7 +148,7 @@ public class TestControllerLeadershipChange extends ZkTestBase {
     callbackHandlers.forEach(callbackHandler -> Assert.assertTrue(callbackHandler.isReady()));
 
     // check the zk connection is open
-    HelixZkClient zkClient = controller.getZkClient();
+    RealmAwareZkClient zkClient = controller.getZkClient();
     Assert.assertFalse(zkClient.isClosed());
     Long sessionId = zkClient.getSessionId();
     Assert.assertNotNull(sessionId);
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterControllerManager.java b/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterControllerManager.java
index 7d810fb..9281e2d 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterControllerManager.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterControllerManager.java
@@ -27,6 +27,7 @@ import org.apache.helix.InstanceType;
 import org.apache.helix.manager.zk.CallbackHandler;
 import org.apache.helix.manager.zk.ZKHelixManager;
 import org.apache.helix.zookeeper.api.client.HelixZkClient;
+import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -94,7 +95,7 @@ public class ClusterControllerManager extends ZKHelixManager implements Runnable
   }
 
   @Override
-  public HelixZkClient getZkClient() {
+  public RealmAwareZkClient getZkClient() {
     return _zkclient;
   }
 
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterDistributedController.java b/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterDistributedController.java
index 8e15928..397fae5 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterDistributedController.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterDistributedController.java
@@ -28,6 +28,7 @@ import org.apache.helix.manager.zk.ZKHelixManager;
 import org.apache.helix.zookeeper.api.client.HelixZkClient;
 import org.apache.helix.participant.DistClusterControllerStateModelFactory;
 import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -83,7 +84,7 @@ public class ClusterDistributedController extends ZKHelixManager implements Runn
   }
 
   @Override
-  public HelixZkClient getZkClient() {
+  public RealmAwareZkClient getZkClient() {
     return _zkclient;
   }
 
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java b/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java
index 4f1b966..4a44502 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java
@@ -34,6 +34,7 @@ import org.apache.helix.mock.participant.MockSchemataModelFactory;
 import org.apache.helix.mock.participant.MockTransition;
 import org.apache.helix.model.BuiltInStateModelDefinitions;
 import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -136,7 +137,7 @@ public class MockParticipantManager extends ZKHelixManager implements Runnable,
   }
 
   @Override
-  public HelixZkClient getZkClient() {
+  public RealmAwareZkClient getZkClient() {
     return _zkclient;
   }
 
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/ZkTestManager.java b/helix-core/src/test/java/org/apache/helix/integration/manager/ZkTestManager.java
index 1a6903a..93e4b53 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/manager/ZkTestManager.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/manager/ZkTestManager.java
@@ -22,10 +22,11 @@ package org.apache.helix.integration.manager;
 import java.util.List;
 
 import org.apache.helix.manager.zk.CallbackHandler;
-import org.apache.helix.zookeeper.api.client.HelixZkClient;
+import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
+
 
 public interface ZkTestManager {
-  HelixZkClient getZkClient();
+  RealmAwareZkClient getZkClient();
 
   List<CallbackHandler> getHandlers();
 
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestHandleSession.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestHandleSession.java
index 92eb3c4..54d38ee 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestHandleSession.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestHandleSession.java
@@ -41,6 +41,7 @@ import org.apache.helix.controller.GenericHelixController;
 import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.model.LiveInstance;
 import org.apache.helix.zookeeper.api.client.HelixZkClient;
+import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
 import org.apache.helix.zookeeper.impl.client.ZkClient;
 import org.testng.Assert;
 import org.testng.annotations.Test;
@@ -570,7 +571,7 @@ public class TestHandleSession extends ZkTestBase {
       return _handlers;
     }
 
-    HelixZkClient getZkClient() {
+    RealmAwareZkClient getZkClient() {
       return _zkclient;
     }
 
@@ -616,7 +617,7 @@ public class TestHandleSession extends ZkTestBase {
       return _handlers;
     }
 
-    HelixZkClient getZkClient() {
+    RealmAwareZkClient getZkClient() {
       return _zkclient;
     }
 
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/factory/HelixZkClientFactory.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/factory/HelixZkClientFactory.java
index ca13321..1f792ee 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/factory/HelixZkClientFactory.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/factory/HelixZkClientFactory.java
@@ -29,7 +29,7 @@ import org.apache.helix.zookeeper.zkclient.ZkConnection;
 /**
  * Abstract class of the ZkClient factory.
  */
-abstract class HelixZkClientFactory implements RealmAwareZkClientFactory {
+public abstract class HelixZkClientFactory implements RealmAwareZkClientFactory {
 
   /**
    * Build a ZkClient using specified connection config and client config