You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by hu...@apache.org on 2020/08/13 22:31:31 UTC

[helix] branch master updated: Add HelixManager constructor with RealmAwareZkConnectionConfig (#1183)

This is an automated email from the ASF dual-hosted git repository.

hulee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new eeb738d  Add HelixManager constructor with RealmAwareZkConnectionConfig (#1183)
eeb738d is described below

commit eeb738dbd8090516bf81b783b41b770774f92a76
Author: Hunter Lee <hu...@linkedin.com>
AuthorDate: Thu Aug 13 15:31:24 2020 -0700

    Add HelixManager constructor with RealmAwareZkConnectionConfig (#1183)
    
    Previously, there wasn't a way to use HelixManager with a custom routing data ZK connection config. This new way of constructing HelixManager allows users to use HelixManager with a custom RealmAwareZkConnectionConfig.
    
    This PR also adds a null check for zkAddress for existing Helix APIs so that it will know when to start themselves in a multi-zk mode.
---
 .../main/java/org/apache/helix/ConfigAccessor.java |  2 +-
 .../java/org/apache/helix/HelixManagerFactory.java | 24 ++++--
 .../org/apache/helix/HelixManagerProperty.java     | 80 +++++++++++++++---
 .../org/apache/helix/HelixPropertyFactory.java     |  5 +-
 .../org/apache/helix/manager/zk/ZKHelixAdmin.java  |  6 +-
 .../apache/helix/manager/zk/ZKHelixManager.java    | 95 +++++++++++++++++-----
 .../java/org/apache/helix/manager/zk/ZKUtil.java   |  6 +-
 .../helix/manager/zk/ZkBaseDataAccessor.java       |  2 +-
 .../helix/manager/zk/ZkBucketDataAccessor.java     |  5 +-
 .../java/org/apache/helix/tools/ClusterSetup.java  |  2 +-
 .../ClusterVerifiers/ZkHelixClusterVerifier.java   |  4 +-
 .../multizk/TestMultiZkHelixJavaApis.java          | 68 ++++++++++++++--
 12 files changed, 238 insertions(+), 61 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/ConfigAccessor.java b/helix-core/src/main/java/org/apache/helix/ConfigAccessor.java
index bc271c5..48bcfd4 100644
--- a/helix-core/src/main/java/org/apache/helix/ConfigAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/ConfigAccessor.java
@@ -113,7 +113,7 @@ public class ConfigAccessor {
     _usesExternalZkClient = false;
 
     // If the multi ZK config is enabled, use FederatedZkClient on multi-realm mode
-    if (Boolean.parseBoolean(System.getProperty(SystemPropertyKeys.MULTI_ZK_ENABLED))) {
+    if (Boolean.getBoolean(SystemPropertyKeys.MULTI_ZK_ENABLED) || zkAddress == null) {
       try {
         _zkClient = new FederatedZkClient(
             new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder().build(),
diff --git a/helix-core/src/main/java/org/apache/helix/HelixManagerFactory.java b/helix-core/src/main/java/org/apache/helix/HelixManagerFactory.java
index 2933808..7150928 100644
--- a/helix-core/src/main/java/org/apache/helix/HelixManagerFactory.java
+++ b/helix-core/src/main/java/org/apache/helix/HelixManagerFactory.java
@@ -19,19 +19,15 @@ package org.apache.helix;
  * under the License.
  */
 
-/**
- * factory that creates cluster managers
- *
- * for zk-based cluster managers, the getZKXXX(..zkClient) that takes a zkClient parameter
- *   are intended for session expiry test purpose
- */
 import org.apache.helix.manager.zk.HelixManagerStateListener;
 import org.apache.helix.manager.zk.ZKHelixManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+
 /**
  * Obtain one of a set of Helix cluster managers, organized by the backing system.
+ * factory that creates cluster managers.
  */
 public final class HelixManagerFactory {
   private static final Logger LOG = LoggerFactory.getLogger(HelixManagerFactory.class);
@@ -65,4 +61,20 @@ public final class HelixManagerFactory {
     return new ZKHelixManager(clusterName, instanceName, type, zkAddr, stateListener);
   }
 
+  /**
+   * Construct a ZkHelixManager using the HelixManagerProperty instance given.
+   * HelixManagerProperty given must contain a valid ZkConnectionConfig.
+   * @param clusterName
+   * @param instanceName
+   * @param type
+   * @param stateListener
+   * @param helixManagerProperty must contain a valid ZkConnectionConfig
+   * @return
+   */
+  public static HelixManager getZKHelixManager(String clusterName, String instanceName,
+      InstanceType type, HelixManagerStateListener stateListener,
+      HelixManagerProperty helixManagerProperty) {
+    return new ZKHelixManager(clusterName, instanceName, type, null, stateListener,
+        helixManagerProperty);
+  }
 }
diff --git a/helix-core/src/main/java/org/apache/helix/HelixManagerProperty.java b/helix-core/src/main/java/org/apache/helix/HelixManagerProperty.java
index 2ec26a7..f566b99 100644
--- a/helix-core/src/main/java/org/apache/helix/HelixManagerProperty.java
+++ b/helix-core/src/main/java/org/apache/helix/HelixManagerProperty.java
@@ -20,32 +20,51 @@ package org.apache.helix;
  */
 
 import java.util.Properties;
+
 import org.apache.helix.model.CloudConfig;
+import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+
 /**
- * Hold Helix manager properties. The manager properties further hold Helix cloud properties
- * and some other properties specific for the manager.
+ * HelixManagerProperty is a general property/config object used for HelixManager creation.
  */
 public class HelixManagerProperty {
   private static final Logger LOG = LoggerFactory.getLogger(HelixManagerProperty.class.getName());
   private String _version;
   private long _healthReportLatency;
   private HelixCloudProperty _helixCloudProperty;
+  private RealmAwareZkClient.RealmAwareZkConnectionConfig _zkConnectionConfig;
+  private RealmAwareZkClient.RealmAwareZkClientConfig _zkClientConfig;
 
   /**
+   * ** Deprecated - HelixManagerProperty should be a general property/config object used for
+   * HelixManager creation, not tied only to Properties or CloudConfig **
+   *
    * Initialize Helix manager property with default value
    * @param helixManagerProperties helix manager related properties input as a map
    * @param cloudConfig cloudConfig read from Zookeeper
    */
+  @Deprecated
   public HelixManagerProperty(Properties helixManagerProperties, CloudConfig cloudConfig) {
     _helixCloudProperty = new HelixCloudProperty(cloudConfig);
-    setVersion(helixManagerProperties.getProperty(SystemPropertyKeys.HELIX_MANAGER_VERSION));
-    setHealthReportLatency(
+    _version = helixManagerProperties.getProperty(SystemPropertyKeys.HELIX_MANAGER_VERSION);
+    _healthReportLatency = Long.parseLong(
         helixManagerProperties.getProperty(SystemPropertyKeys.PARTICIPANT_HEALTH_REPORT_LATENCY));
   }
 
+  private HelixManagerProperty(String version, long healthReportLatency,
+      HelixCloudProperty helixCloudProperty,
+      RealmAwareZkClient.RealmAwareZkConnectionConfig zkConnectionConfig,
+      RealmAwareZkClient.RealmAwareZkClientConfig zkClientConfig) {
+    _version = version;
+    _healthReportLatency = healthReportLatency;
+    _helixCloudProperty = helixCloudProperty;
+    _zkConnectionConfig = zkConnectionConfig;
+    _zkClientConfig = zkClientConfig;
+  }
+
   public HelixCloudProperty getHelixCloudProperty() {
     return _helixCloudProperty;
   }
@@ -58,17 +77,54 @@ public class HelixManagerProperty {
     return _healthReportLatency;
   }
 
-  public void setHelixCloudProperty(HelixCloudProperty helixCloudProperty) {
-    _helixCloudProperty = helixCloudProperty;
+  public RealmAwareZkClient.RealmAwareZkConnectionConfig getZkConnectionConfig() {
+    return _zkConnectionConfig;
   }
 
-  public void setVersion(String version) {
-    _version = version;
+  public RealmAwareZkClient.RealmAwareZkClientConfig getZkClientConfig() {
+    return _zkClientConfig;
   }
 
-  public void setHealthReportLatency(String latency) {
-    _healthReportLatency = Long.valueOf(latency);
-  }
+  public static class Builder {
+    private String _version;
+    private long _healthReportLatency;
+    private HelixCloudProperty _helixCloudProperty;
+    private RealmAwareZkClient.RealmAwareZkConnectionConfig _zkConnectionConfig;
+    private RealmAwareZkClient.RealmAwareZkClientConfig _zkClientConfig;
+
+    public Builder() {
+    }
+
+    public HelixManagerProperty build() {
+      return new HelixManagerProperty(_version, _healthReportLatency, _helixCloudProperty,
+          _zkConnectionConfig, _zkClientConfig);
+    }
+
+    public Builder setVersion(String version) {
+      _version = version;
+      return this;
+    }
 
-  // TODO: migrate all other participant related properties to this file.
+    public Builder setHealthReportLatency(long healthReportLatency) {
+      _healthReportLatency = healthReportLatency;
+      return this;
+    }
+
+    public Builder setHelixCloudProperty(HelixCloudProperty helixCloudProperty) {
+      _helixCloudProperty = helixCloudProperty;
+      return this;
+    }
+
+    public Builder setRealmAWareZkConnectionConfig(
+        RealmAwareZkClient.RealmAwareZkConnectionConfig zkConnectionConfig) {
+      _zkConnectionConfig = zkConnectionConfig;
+      return this;
+    }
+
+    public Builder setRealmAwareZkClientConfig(
+        RealmAwareZkClient.RealmAwareZkClientConfig zkClientConfig) {
+      _zkClientConfig = zkClientConfig;
+      return this;
+    }
+  }
 }
diff --git a/helix-core/src/main/java/org/apache/helix/HelixPropertyFactory.java b/helix-core/src/main/java/org/apache/helix/HelixPropertyFactory.java
index cbdc643..fa01d79 100644
--- a/helix-core/src/main/java/org/apache/helix/HelixPropertyFactory.java
+++ b/helix-core/src/main/java/org/apache/helix/HelixPropertyFactory.java
@@ -88,8 +88,9 @@ public final class HelixPropertyFactory {
     CloudConfig cloudConfig;
     RealmAwareZkClient dedicatedZkClient = null;
     try {
-      if (Boolean.getBoolean(SystemPropertyKeys.MULTI_ZK_ENABLED)) {
-        // If the multi ZK config is enabled, use multi-realm mode with DedicatedZkClient
+      if (Boolean.getBoolean(SystemPropertyKeys.MULTI_ZK_ENABLED) || zkAddress == null) {
+        // If the multi ZK config is enabled or zkAddress is null, use realm-aware mode with
+        // DedicatedZkClient
         try {
           RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfig =
               new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder()
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
index 6562e8f..0e123b2 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
@@ -127,8 +127,8 @@ public class ZKHelixAdmin implements HelixAdmin {
 
   /**
    * There are 2 realm-aware modes to connect to ZK:
-   * 1. if system property {@link SystemPropertyKeys#MULTI_ZK_ENABLED} is set to <code>"true"</code>,
-   * it will connect on multi-realm mode;
+   * 1. if system property {@link SystemPropertyKeys#MULTI_ZK_ENABLED} is set to <code>"true"</code>
+   * , or zkAddress is null, it will connect on multi-realm mode;
    * 2. otherwise, it will connect on single-realm mode to the <code>zkAddress</code> provided.
    *
    * @param zkAddress ZK address
@@ -146,7 +146,7 @@ public class ZKHelixAdmin implements HelixAdmin {
 
     RealmAwareZkClient zkClient;
 
-    if (Boolean.getBoolean(SystemPropertyKeys.MULTI_ZK_ENABLED)) {
+    if (Boolean.getBoolean(SystemPropertyKeys.MULTI_ZK_ENABLED) || zkAddress == null) {
       try {
         zkClient = new FederatedZkClient(
             new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder().build(), clientConfig);
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
index 18ccbe7..f8967bc 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
@@ -131,6 +131,7 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
   private int _reportLatency;
 
   protected RealmAwareZkClient _zkclient;
+  private RealmAwareZkClient.RealmAwareZkConnectionConfig _realmAwareZkConnectionConfig;
   private final DefaultMessagingService _messagingService;
   private Map<ChangeType, HelixCallbackMonitor> _callbackMonitors;
 
@@ -224,13 +225,14 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
   public ZKHelixManager(String clusterName, String instanceName, InstanceType instanceType,
       String zkAddress, HelixManagerStateListener stateListener,
       HelixManagerProperty helixManagerProperty) {
-
-    LOG.info("Create a zk-based cluster manager. zkSvr: " + zkAddress + ", clusterName: "
-        + clusterName + ", instanceName: " + instanceName + ", type: " + instanceType);
+    validateZkConnectionSettings(zkAddress, helixManagerProperty);
 
     _zkAddress = zkAddress;
     _clusterName = clusterName;
     _instanceType = instanceType;
+    LOG.info("Create a zk-based cluster manager. ZK connection: " + getZkConnectionInfo()
+        + ", clusterName: " + clusterName + ", instanceName: " + instanceName + ", type: "
+        + instanceType);
 
     if (instanceName == null) {
       try {
@@ -318,13 +320,11 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
       _stateMachineEngine = null;
       _participantHealthInfoCollector = null;
       _controllerTimerTasks.add(new StatusDumpTask(this));
-
       break;
     case CONTROLLER_PARTICIPANT:
       _stateMachineEngine = new HelixStateMachineEngine(this);
       _participantHealthInfoCollector =
           new ParticipantHealthReportCollectorImpl(this, _instanceName);
-
       _timerTasks
           .add(new ParticipantHealthReportTask(_participantHealthInfoCollector, _reportLatency));
       _controllerTimerTasks.add(new StatusDumpTask(this));
@@ -348,7 +348,8 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
     _enabledPipelineTypes = types;
   }
 
-  @Override public boolean removeListener(PropertyKey key, Object listener) {
+  @Override
+  public boolean removeListener(PropertyKey key, Object listener) {
     LOG.info("Removing listener: " + listener + " on path: " + key.getPath() + " from cluster: "
         + _clusterName + " by instance: " + _instanceName);
 
@@ -390,15 +391,14 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
 
     boolean isConnected = isConnected();
     if (!isConnected && timeout > 0) {
-      LOG.warn(
-          "zkClient to " + _zkAddress + " is not connected, wait for " + _waitForConnectedTimeout
-              + "ms.");
+      LOG.warn("zkClient to " + getZkConnectionInfo() + " is not connected, wait for "
+          + _waitForConnectedTimeout + "ms.");
       isConnected = _zkclient.waitUntilConnected(_waitForConnectedTimeout, TimeUnit.MILLISECONDS);
     }
 
     if (!isConnected) {
       LOG.error("zkClient is not connected after waiting " + timeout + "ms."
-          + ", clusterName: " + _clusterName + ", zkAddress: " + _zkAddress);
+          + ", clusterName: " + _clusterName + ", zkAddress: " + getZkConnectionInfo());
       throw new HelixException(
           "HelixManager is not connected within retry timeout for cluster " + _clusterName);
     }
@@ -671,10 +671,13 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
   }
 
   /**
+   * Deprecated becuase ZKHelixManager shouldn't expose ZkAddress in realm-aware mode.
+   *
    * Returns a string that can be used to connect to metadata store for this HelixManager instance
    * i.e. for ZkHelixManager, this will have format "{zookeeper-address}:{port}"
    * @return a string used to connect to metadata store
    */
+  @Deprecated
   @Override
   public String getMetadataStoreConnectionString() {
     return _zkAddress;
@@ -1091,7 +1094,7 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
       isConnected =
           _zkclient.waitUntilConnected(HelixZkClient.DEFAULT_CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS);
       if (!isConnected) {
-        LOG.error("fail to connect zkserver: " + _zkAddress + " in "
+        LOG.error("fail to connect zkserver: " + getZkConnectionInfo() + " in "
             + HelixZkClient.DEFAULT_CONNECTION_TIMEOUT + "ms. expiredSessionId: " + _sessionId
             + ", clusterName: " + _clusterName);
         continue;
@@ -1405,11 +1408,15 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
     PathBasedZkSerializer zkSerializer =
         ChainedPathZkSerializer.builder(new ZNRecordSerializer()).build();
 
-    RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfig =
-        new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder()
-            .setRealmMode(RealmAwareZkClient.RealmMode.SINGLE_REALM)
-            .setZkRealmShardingKey(shardingKey)
-            .setSessionTimeout(_sessionTimeout).build();
+    // If the user supplied RealmAwareZkConnectionConfig, then use it. Only create the connection
+    // config if nothing is given
+    if (_realmAwareZkConnectionConfig == null) {
+      // If no connection config is given, use the single realm mode with the cluster name as the
+      // key
+      _realmAwareZkConnectionConfig = new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder()
+          .setRealmMode(RealmAwareZkClient.RealmMode.SINGLE_REALM)
+          .setZkRealmShardingKey(shardingKey).setSessionTimeout(_sessionTimeout).build();
+    }
 
     RealmAwareZkClient.RealmAwareZkClientConfig clientConfig =
         new RealmAwareZkClient.RealmAwareZkClientConfig();
@@ -1422,27 +1429,27 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
         .setMonitorRootPathOnly(isMonitorRootPathOnly());
 
     if (_instanceType == InstanceType.ADMINISTRATOR) {
-      return resolveZkClient(SharedZkClientFactory.getInstance(), connectionConfig,
+      return resolveZkClient(SharedZkClientFactory.getInstance(), _realmAwareZkConnectionConfig,
           clientConfig);
     }
 
-    return resolveZkClient(DedicatedZkClientFactory.getInstance(), connectionConfig,
+    return resolveZkClient(DedicatedZkClientFactory.getInstance(), _realmAwareZkConnectionConfig,
         clientConfig);
   }
 
   /*
    * Resolves what type of ZkClient this HelixManager should use based on whether MULTI_ZK_ENABLED
    * System config is set or not. Two types of ZkClients are available:
-   * 1) If MULTI_ZK_ENABLED is set to true, we create a dedicated RealmAwareZkClient
-   * that provides full ZkClient functionalities and connects to the correct ZK by querying
-   * MetadataStoreDirectoryService.
+   * 1) If MULTI_ZK_ENABLED is set to true or zkAddress is null, we create a dedicated
+   * RealmAwareZkClient that provides full ZkClient functionalities and connects to the correct ZK
+   * by querying MetadataStoreDirectoryService.
    * 2) Otherwise, we create a dedicated HelixZkClient which plainly connects to
    * the ZK address given.
    */
   private RealmAwareZkClient resolveZkClient(HelixZkClientFactory zkClientFactory,
       RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfig,
       RealmAwareZkClient.RealmAwareZkClientConfig clientConfig) {
-    if (Boolean.getBoolean(SystemPropertyKeys.MULTI_ZK_ENABLED)) {
+    if (Boolean.getBoolean(SystemPropertyKeys.MULTI_ZK_ENABLED) || _zkAddress == null) {
       try {
         // Create realm-aware ZkClient.
         return zkClientFactory.buildZkClient(connectionConfig, clientConfig);
@@ -1464,4 +1471,48 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
   private String buildShardingKey() {
     return _clusterName.charAt(0) == '/' ? _clusterName : "/" + _clusterName;
   }
+
+  /**
+   * Check that not both zkAddress and ZkConnectionConfig are set.
+   * If zkAddress is not given and ZkConnectionConfig is given, check that ZkConnectionConfig has
+   * a ZK path sharding key set because HelixManager must work on single-realm mode.
+   * @param zkAddress
+   * @param helixManagerProperty
+   */
+  private void validateZkConnectionSettings(String zkAddress,
+      HelixManagerProperty helixManagerProperty) {
+    if (helixManagerProperty != null && helixManagerProperty.getZkConnectionConfig() != null) {
+      if (zkAddress != null) {
+        throw new HelixException(
+            "ZKHelixManager: cannot have both ZkAddress and ZkConnectionConfig set!");
+      }
+      RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfig =
+          helixManagerProperty.getZkConnectionConfig();
+      if (connectionConfig.getZkRealmShardingKey() == null || connectionConfig
+          .getZkRealmShardingKey().isEmpty()) {
+        throw new HelixException(
+            "ZKHelixManager::ZK path sharding key must be set for ZKHelixManager! ZKHelixManager "
+                + "is only available on single-realm mode.");
+      }
+      _realmAwareZkConnectionConfig = connectionConfig;
+    }
+  }
+
+  /**
+   * Resolve ZK connection info for logging purposes.
+   * @return
+   */
+  private String getZkConnectionInfo() {
+    String zkConnectionInfo;
+    if (_zkAddress == null) {
+      if (_helixManagerProperty != null && _helixManagerProperty.getZkConnectionConfig() != null) {
+        zkConnectionInfo = _helixManagerProperty.getZkConnectionConfig().toString();
+      } else {
+        zkConnectionInfo = "None";
+      }
+    } else {
+      zkConnectionInfo = _zkAddress;
+    }
+    return zkConnectionInfo;
+  }
 }
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKUtil.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKUtil.java
index 61fcba2..cda5f39 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKUtil.java
@@ -614,7 +614,7 @@ public final class ZKUtil {
    * @return
    */
   private static RealmAwareZkClient getHelixZkClient(String zkAddr) {
-    if (Boolean.getBoolean(SystemPropertyKeys.MULTI_ZK_ENABLED)) {
+    if (Boolean.getBoolean(SystemPropertyKeys.MULTI_ZK_ENABLED) || zkAddr == null) {
       try {
         // Create realm-aware ZkClient.
         RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfig =
@@ -626,8 +626,8 @@ public final class ZKUtil {
         throw new HelixException("Not able to connect on realm-aware mode", e);
       }
     }
-    if (zkAddr == null || zkAddr.isEmpty()) {
-      throw new HelixException("ZK Address given is either null or empty!");
+    if (zkAddr.isEmpty()) {
+      throw new HelixException("ZK Address given is empty!");
     }
     HelixZkClient.ZkClientConfig clientConfig = new HelixZkClient.ZkClientConfig();
     clientConfig.setZkSerializer(new ZNRecordSerializer());
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
index f31bcc8..d2096b0 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
@@ -1337,7 +1337,7 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
   static RealmAwareZkClient buildRealmAwareZkClientWithDefaultConfigs(
       RealmAwareZkClient.RealmAwareZkClientConfig clientConfig, String zkAddress,
       ZkClientType zkClientType) {
-    if (Boolean.getBoolean(SystemPropertyKeys.MULTI_ZK_ENABLED)) {
+    if (Boolean.getBoolean(SystemPropertyKeys.MULTI_ZK_ENABLED) || zkAddress == null) {
       // If the multi ZK config is enabled, use multi-realm mode with FederatedZkClient
       try {
         return new FederatedZkClient(
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java
index 51d96e9..8f0aa16 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java
@@ -83,7 +83,10 @@ public class ZkBucketDataAccessor implements BucketDataAccessor, AutoCloseable {
    * @param versionTTLms in ms
    */
   public ZkBucketDataAccessor(String zkAddr, int bucketSize, long versionTTLms) {
-    if (Boolean.getBoolean(SystemPropertyKeys.MULTI_ZK_ENABLED)) {
+    if (Boolean.getBoolean(SystemPropertyKeys.MULTI_ZK_ENABLED) || zkAddr == null) {
+      LOG.warn(
+          "ZkBucketDataAccessor: either multi-zk enabled or zkAddr is null - "
+              + "starting ZkBucketDataAccessor in multi-zk mode!");
       try {
         // Create realm-aware ZkClient.
         RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfig =
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
index 6dbf19c..59180eb 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
@@ -156,7 +156,7 @@ public class ClusterSetup {
   @Deprecated
   public ClusterSetup(String zkServerAddress) {
     // If the multi ZK config is enabled, use FederatedZkClient on multi-realm mode
-    if (Boolean.parseBoolean(System.getProperty(SystemPropertyKeys.MULTI_ZK_ENABLED))) {
+    if (Boolean.getBoolean(SystemPropertyKeys.MULTI_ZK_ENABLED) || zkServerAddress == null) {
       try {
         _zkClient = new FederatedZkClient(
             new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder().build(),
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ZkHelixClusterVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ZkHelixClusterVerifier.java
index 11905bd..98bb835 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ZkHelixClusterVerifier.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ZkHelixClusterVerifier.java
@@ -111,7 +111,7 @@ public abstract class ZkHelixClusterVerifier
       throw new IllegalArgumentException("ZkHelixClusterVerifier: clusterName is null or empty!");
     }
     // If the multi ZK config is enabled, use DedicatedZkClient on multi-realm mode
-    if (Boolean.parseBoolean(System.getProperty(SystemPropertyKeys.MULTI_ZK_ENABLED))) {
+    if (Boolean.getBoolean(SystemPropertyKeys.MULTI_ZK_ENABLED) || zkAddr == null) {
       try {
         RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder connectionConfigBuilder =
             new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder();
@@ -370,7 +370,7 @@ public abstract class ZkHelixClusterVerifier
     protected RealmAwareZkClient createZkClient(RealmAwareZkClient.RealmMode realmMode,
         RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfig,
         RealmAwareZkClient.RealmAwareZkClientConfig clientConfig, String zkAddress) {
-      if (Boolean.parseBoolean(System.getProperty(SystemPropertyKeys.MULTI_ZK_ENABLED))) {
+      if (Boolean.getBoolean(SystemPropertyKeys.MULTI_ZK_ENABLED) || zkAddress == null) {
         try {
           // First, try to create a RealmAwareZkClient that's a DedicatedZkClient
           return DedicatedZkClientFactory.getInstance()
diff --git a/helix-core/src/test/java/org/apache/helix/integration/multizk/TestMultiZkHelixJavaApis.java b/helix-core/src/test/java/org/apache/helix/integration/multizk/TestMultiZkHelixJavaApis.java
index 95a118d..5146ce6 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/multizk/TestMultiZkHelixJavaApis.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/multizk/TestMultiZkHelixJavaApis.java
@@ -36,6 +36,9 @@ import org.apache.helix.BaseDataAccessor;
 import org.apache.helix.ConfigAccessor;
 import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.HelixManagerProperty;
 import org.apache.helix.InstanceType;
 import org.apache.helix.SystemPropertyKeys;
 import org.apache.helix.TestHelper;
@@ -346,10 +349,64 @@ public class TestMultiZkHelixJavaApis {
   }
 
   /**
+   * Test creation of HelixManager and makes sure it connects correctly.
+   */
+  @Test(dependsOnMethods = "testCreateParticipants")
+  public void testZKHelixManager() throws Exception {
+    String clusterName = "CLUSTER_1";
+    String participantName = "HelixManager";
+    InstanceConfig instanceConfig = new InstanceConfig(participantName);
+    _zkHelixAdmin.addInstance(clusterName, instanceConfig);
+
+    RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder connectionConfigBuilder =
+        new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder();
+    // Try with a connection config without ZK realm sharding key set (should fail)
+    RealmAwareZkClient.RealmAwareZkConnectionConfig invalidZkConnectionConfig =
+        connectionConfigBuilder.build();
+    RealmAwareZkClient.RealmAwareZkConnectionConfig validZkConnectionConfig =
+        connectionConfigBuilder.setZkRealmShardingKey("/" + clusterName).build();
+    HelixManagerProperty.Builder propertyBuilder = new HelixManagerProperty.Builder();
+    try {
+      HelixManager invalidManager = HelixManagerFactory
+          .getZKHelixManager(clusterName, participantName, InstanceType.PARTICIPANT, null,
+              propertyBuilder.setRealmAWareZkConnectionConfig(invalidZkConnectionConfig).build());
+      Assert.fail("Should see a HelixException here because the connection config doesn't have the "
+          + "sharding key set!");
+    } catch (HelixException e) {
+      // Expected
+    }
+
+    // Connect as a participant
+    HelixManager managerParticipant = HelixManagerFactory
+        .getZKHelixManager(clusterName, participantName, InstanceType.PARTICIPANT, null,
+            propertyBuilder.setRealmAWareZkConnectionConfig(validZkConnectionConfig).build());
+    managerParticipant.connect();
+
+    // Connect as an administrator
+    HelixManager managerAdministrator = HelixManagerFactory
+        .getZKHelixManager(clusterName, participantName, InstanceType.ADMINISTRATOR, null,
+            propertyBuilder.setRealmAWareZkConnectionConfig(validZkConnectionConfig).build());
+    managerAdministrator.connect();
+
+    // Perform assert checks to make sure the manager can read and register itself as a participant
+    InstanceConfig instanceConfigRead = managerAdministrator.getClusterManagmentTool()
+        .getInstanceConfig(clusterName, participantName);
+    Assert.assertNotNull(instanceConfigRead);
+    Assert.assertEquals(instanceConfig.getInstanceName(), participantName);
+    Assert.assertNotNull(managerAdministrator.getHelixDataAccessor().getProperty(
+        managerAdministrator.getHelixDataAccessor().keyBuilder().liveInstance(participantName)));
+
+    // Clean up
+    managerParticipant.disconnect();
+    managerAdministrator.disconnect();
+    _zkHelixAdmin.dropInstance(clusterName, instanceConfig);
+  }
+
+  /**
    * Test that clusters and instances are set up properly.
    * Helix Java APIs tested in this method is ZkUtil.
    */
-  @Test(dependsOnMethods = "testCreateParticipants")
+  @Test(dependsOnMethods = "testZKHelixManager")
   public void testZkUtil() {
     CLUSTER_LIST.forEach(cluster -> {
       _zkHelixAdmin.getInstancesInCluster(cluster).forEach(instance -> ZKUtil
@@ -820,8 +877,7 @@ public class TestMultiZkHelixJavaApis {
         new ClusterSetup.Builder().setRealmAwareZkConnectionConfig(connectionConfig).build();
 
     try {
-      verifyMsdsZkRealm(CLUSTER_ONE, true,
-          () -> firstClusterSetup.addCluster(CLUSTER_ONE, false));
+      verifyMsdsZkRealm(CLUSTER_ONE, true, () -> firstClusterSetup.addCluster(CLUSTER_ONE, false));
       verifyMsdsZkRealm(CLUSTER_FOUR, false,
           () -> firstClusterSetup.addCluster(CLUSTER_FOUR, false));
 
@@ -856,8 +912,7 @@ public class TestMultiZkHelixJavaApis {
         new ZKHelixAdmin.Builder().setRealmAwareZkConnectionConfig(connectionConfig).build();
 
     try {
-      verifyMsdsZkRealm(CLUSTER_ONE, true,
-          () -> firstHelixAdmin.enableCluster(CLUSTER_ONE, true));
+      verifyMsdsZkRealm(CLUSTER_ONE, true, () -> firstHelixAdmin.enableCluster(CLUSTER_ONE, true));
       verifyMsdsZkRealm(CLUSTER_FOUR, false,
           () -> firstHelixAdmin.enableCluster(CLUSTER_FOUR, true));
 
@@ -880,8 +935,7 @@ public class TestMultiZkHelixJavaApis {
         new ConfigAccessor.Builder().setRealmAwareZkConnectionConfig(connectionConfig).build();
 
     try {
-      verifyMsdsZkRealm(CLUSTER_ONE, true,
-          () -> firstConfigAccessor.getClusterConfig(CLUSTER_ONE));
+      verifyMsdsZkRealm(CLUSTER_ONE, true, () -> firstConfigAccessor.getClusterConfig(CLUSTER_ONE));
       verifyMsdsZkRealm(CLUSTER_FOUR, false,
           () -> firstConfigAccessor.getClusterConfig(CLUSTER_FOUR));