You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by hu...@apache.org on 2020/04/08 22:54:05 UTC

[helix] 35/50: Make ClusterSetup realm-aware (#861)

This is an automated email from the ASF dual-hosted git repository.

hulee pushed a commit to branch zooscalability_merge
in repository https://gitbox.apache.org/repos/asf/helix.git

commit f69ba884cd6828adec12e7ea10264db6338f387f
Author: Hunter Lee <hu...@linkedin.com>
AuthorDate: Wed Mar 11 19:35:42 2020 -0700

    Make ClusterSetup realm-aware (#861)
    
    We make ClusterSetup, a Helix Java API, realm-aware so that this could be used in a multi-ZK environment.
    
    Changelist:
    Add a Builder to enable users to set internal ZkClient parameters
    Add the realm-aware behavior in existing constructors
    Update ConfigAccessor to reflect the change in the logic
---
 .../java/org/apache/helix/SystemPropertyKeys.java  |   6 +
 .../main/java/org/apache/helix/ConfigAccessor.java |  66 ++++++----
 .../java/org/apache/helix/SystemPropertyKeys.java  |  70 ----------
 .../org/apache/helix/manager/zk/ZKHelixAdmin.java  |   5 +-
 .../helix/manager/zk/ZkBaseDataAccessor.java       |   5 +-
 .../java/org/apache/helix/tools/ClusterSetup.java  | 143 ++++++++++++++++++---
 .../zookeeper/api/client/RealmAwareZkClient.java   |   5 +-
 7 files changed, 179 insertions(+), 121 deletions(-)

diff --git a/helix-common/src/main/java/org/apache/helix/SystemPropertyKeys.java b/helix-common/src/main/java/org/apache/helix/SystemPropertyKeys.java
index bcb8405..a40dbe9 100644
--- a/helix-common/src/main/java/org/apache/helix/SystemPropertyKeys.java
+++ b/helix-common/src/main/java/org/apache/helix/SystemPropertyKeys.java
@@ -26,6 +26,9 @@ public class SystemPropertyKeys {
   // ZKHelixManager
   public static final String CLUSTER_MANAGER_VERSION = "cluster-manager-version.properties";
 
+  // soft constraints weight definitions
+  public static final String SOFT_CONSTRAINT_WEIGHTS = "soft-constraint-weight.properties";
+
   public static final String FLAPPING_TIME_WINDOW = "helixmanager.flappingTimeWindow";
 
   // max disconnect count during the flapping time window to trigger HelixManager flapping handling
@@ -57,4 +60,7 @@ public class SystemPropertyKeys {
 
   // MBean monitor for helix.
   public static final String HELIX_MONITOR_TIME_WINDOW_LENGTH_MS = "helix.monitor.slidingTimeWindow.ms";
+
+  // Multi-ZK mode enable/disable flag
+  public static final String MULTI_ZK_ENABLED = "helix.multiZkEnabled";
 }
diff --git a/helix-core/src/main/java/org/apache/helix/ConfigAccessor.java b/helix-core/src/main/java/org/apache/helix/ConfigAccessor.java
index 52894b1..896f936 100644
--- a/helix-core/src/main/java/org/apache/helix/ConfigAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/ConfigAccessor.java
@@ -29,7 +29,6 @@ import java.util.Map;
 import java.util.TreeMap;
 
 import org.apache.helix.manager.zk.ZKUtil;
-import org.apache.helix.manager.zk.ZNRecordSerializer;
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.CloudConfig;
 import org.apache.helix.model.ConfigScope;
@@ -45,6 +44,7 @@ import org.apache.helix.util.StringTemplate;
 import org.apache.helix.zookeeper.api.client.HelixZkClient;
 import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer;
 import org.apache.helix.zookeeper.impl.client.FederatedZkClient;
 import org.apache.helix.zookeeper.impl.factory.SharedZkClientFactory;
 import org.slf4j.Logger;
@@ -87,18 +87,23 @@ public class ConfigAccessor {
    * Constructor that creates a realm-aware ConfigAccessor using a builder.
    * @param builder
    */
-  private ConfigAccessor(Builder builder) throws IOException, InvalidRoutingDataException {
+  private ConfigAccessor(Builder builder) {
     switch (builder._realmMode) {
       case MULTI_REALM:
-        _zkClient = new FederatedZkClient(builder._realmAwareZkConnectionConfig,
-            builder._realmAwareZkClientConfig);
+        try {
+          _zkClient = new FederatedZkClient(builder._realmAwareZkConnectionConfig,
+              builder._realmAwareZkClientConfig.setZkSerializer(new ZNRecordSerializer()));
+        } catch (IOException | InvalidRoutingDataException | IllegalStateException e) {
+          throw new HelixException("Failed to create ConfigAccessor!", e);
+        }
         break;
       case SINGLE_REALM:
         // Create a HelixZkClient: Use a SharedZkClient because ConfigAccessor does not need to do
         // ephemeral operations
         _zkClient = SharedZkClientFactory.getInstance()
-            .buildZkClient(builder._realmAwareZkConnectionConfig.createZkConnectionConfig(),
-                builder._realmAwareZkClientConfig.createHelixZkClientConfig());
+            .buildZkClient(new HelixZkClient.ZkConnectionConfig(builder._zkAddress),
+                builder._realmAwareZkClientConfig.createHelixZkClientConfig()
+                    .setZkSerializer(new ZNRecordSerializer()));
         break;
       default:
         throw new HelixException("Invalid RealmMode given: " + builder._realmMode);
@@ -124,24 +129,26 @@ public class ConfigAccessor {
    * ConfigAccessor only deals with Helix's data models like ResourceConfig.
    * @param zkAddress
    */
+  @Deprecated
   public ConfigAccessor(String zkAddress) {
-    // First, attempt to connect on multi-realm mode using FederatedZkClient
-    RealmAwareZkClient zkClient;
-    try {
-      zkClient = new FederatedZkClient(
-          new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder().build(),
-          new RealmAwareZkClient.RealmAwareZkClientConfig());
-    } catch (IOException | InvalidRoutingDataException | IllegalStateException e) {
-      // Connecting multi-realm failed - fall back to creating it on single-realm mode using the given ZK address
-      LOG.info(
-          "ConfigAccessor: not able to connect on multi-realm mode; connecting single-realm mode to ZK: {}",
-          zkAddress, e);
-      zkClient = SharedZkClientFactory.getInstance()
-          .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddress),
-              new HelixZkClient.ZkClientConfig().setZkSerializer(new ZNRecordSerializer()));
-    }
-    _zkClient = zkClient;
     _usesExternalZkClient = false;
+
+    // If the multi ZK config is enabled, use FederatedZkClient on multi-realm mode
+    if (Boolean.parseBoolean(System.getProperty(SystemPropertyKeys.MULTI_ZK_ENABLED))) {
+      try {
+        _zkClient = new FederatedZkClient(
+            new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder().build(),
+            new RealmAwareZkClient.RealmAwareZkClientConfig()
+                .setZkSerializer(new ZNRecordSerializer()));
+        return;
+      } catch (IOException | InvalidRoutingDataException | IllegalStateException e) {
+        throw new HelixException("Failed to create ConfigAccessor!", e);
+      }
+    }
+
+    _zkClient = SharedZkClientFactory.getInstance()
+        .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddress),
+            new HelixZkClient.ZkClientConfig().setZkSerializer(new ZNRecordSerializer()));
   }
 
   /**
@@ -1017,7 +1024,7 @@ public class ConfigAccessor {
       return this;
     }
 
-    public ConfigAccessor build() throws Exception {
+    public ConfigAccessor build() {
       validate();
       return new ConfigAccessor(this);
     }
@@ -1032,17 +1039,20 @@ public class ConfigAccessor {
         throw new HelixException(
             "ConfigAccessor: RealmMode cannot be single-realm without a valid ZkAddress set!");
       }
+      if (_realmMode == RealmAwareZkClient.RealmMode.MULTI_REALM && isZkAddressSet) {
+        throw new HelixException(
+            "ConfigAccessor: You cannot set the ZkAddress on multi-realm mode!");
+      }
+
       if (_realmMode == null) {
         _realmMode = isZkAddressSet ? RealmAwareZkClient.RealmMode.SINGLE_REALM
             : RealmAwareZkClient.RealmMode.MULTI_REALM;
       }
 
       // Resolve RealmAwareZkClientConfig
-      boolean isZkClientConfigSet = _realmAwareZkClientConfig != null;
-      // Resolve which clientConfig to use
-      _realmAwareZkClientConfig =
-          isZkClientConfigSet ? _realmAwareZkClientConfig.createHelixZkClientConfig()
-              : new HelixZkClient.ZkClientConfig().setZkSerializer(new ZNRecordSerializer());
+      if (_realmAwareZkClientConfig == null) {
+        _realmAwareZkClientConfig = new RealmAwareZkClient.RealmAwareZkClientConfig();
+      }
 
       // Resolve RealmAwareZkConnectionConfig
       if (_realmAwareZkConnectionConfig == null) {
diff --git a/helix-core/src/main/java/org/apache/helix/SystemPropertyKeys.java b/helix-core/src/main/java/org/apache/helix/SystemPropertyKeys.java
deleted file mode 100644
index d30a2de..0000000
--- a/helix-core/src/main/java/org/apache/helix/SystemPropertyKeys.java
+++ /dev/null
@@ -1,70 +0,0 @@
-package org.apache.helix;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-public class SystemPropertyKeys {
-  // Used to compose default values in HelixManagerProperty
-  public static final String HELIX_MANAGER_PROPERTIES = "helix-manager.properties";
-
-  public static final String HELIX_MANAGER_VERSION = "clustermanager.version";
-
-  // Used to compose default values in HelixCloudProperty when cloud provider is Azure
-  public static final String AZURE_CLOUD_PROPERTIES = "azure-cloud.properties";
-
-  // Task Driver
-  public static final String TASK_CONFIG_LIMITATION = "helixTask.configsLimitation";
-
-  // ZKHelixManager
-  public static final String CLUSTER_MANAGER_VERSION = "cluster-manager-version.properties";
-  // soft constraints weight definitions
-  public static final String SOFT_CONSTRAINT_WEIGHTS = "soft-constraint-weight.properties";
-
-  public static final String FLAPPING_TIME_WINDOW = "helixmanager.flappingTimeWindow";
-
-  // max disconnect count during the flapping time window to trigger HelixManager flapping handling
-  public static final String MAX_DISCONNECT_THRESHOLD = "helixmanager.maxDisconnectThreshold";
-
-  public static final String ZK_SESSION_TIMEOUT = "zk.session.timeout";
-
-  public static final String ZK_CONNECTION_TIMEOUT = "zk.connection.timeout";
-
-  @Deprecated
-  public static final String ZK_REESTABLISHMENT_CONNECTION_TIMEOUT =
-      "zk.connectionReEstablishment.timeout";
-
-  public static final String ZK_WAIT_CONNECTED_TIMEOUT = "helixmanager.waitForConnectedTimeout";
-
-  public static final String PARTICIPANT_HEALTH_REPORT_LATENCY =
-      "helixmanager.participantHealthReport.reportLatency";
-
-  // Indicate monitoring level of the HelixManager metrics
-  public static final String MONITOR_LEVEL = "helixmanager.monitorLevel";
-
-  // CallbackHandler
-  public static final String ASYNC_BATCH_MODE_ENABLED = "helix.callbackhandler.isAsyncBatchModeEnabled";
-
-  public static final String LEGACY_ASYNC_BATCH_MODE_ENABLED = "isAsyncBatchModeEnabled";
-
-  // Controller
-  public static final String CONTROLLER_MESSAGE_PURGE_DELAY = "helix.controller.stages.MessageGenerationPhase.messagePurgeDelay";
-
-  // MBean monitor for helix.
-  public static final String HELIX_MONITOR_TIME_WINDOW_LENGTH_MS = "helix.monitor.slidingTimeWindow.ms";
-}
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
index 923fe26..22a427b 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
@@ -79,6 +79,7 @@ import org.apache.helix.tools.DefaultIdealStateCalculator;
 import org.apache.helix.util.HelixUtil;
 import org.apache.helix.util.RebalanceUtil;
 import org.apache.helix.zookeeper.api.client.HelixZkClient;
+import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.helix.zookeeper.impl.factory.SharedZkClientFactory;
 import org.apache.helix.zookeeper.zkclient.DataUpdater;
@@ -94,7 +95,7 @@ public class ZKHelixAdmin implements HelixAdmin {
   private static final String MAINTENANCE_ZNODE_ID = "maintenance";
   private static final int DEFAULT_SUPERCLUSTER_REPLICA = 3;
 
-  private final HelixZkClient _zkClient;
+  private final RealmAwareZkClient _zkClient;
   private final ConfigAccessor _configAccessor;
   // true if ZKHelixAdmin was instantiated with a HelixZkClient, false otherwise
   // This is used for close() to determine how ZKHelixAdmin should close the underlying ZkClient
@@ -103,7 +104,7 @@ public class ZKHelixAdmin implements HelixAdmin {
   private static Logger logger = LoggerFactory.getLogger(ZKHelixAdmin.class);
 
   @Deprecated
-  public ZKHelixAdmin(HelixZkClient zkClient) {
+  public ZKHelixAdmin(RealmAwareZkClient zkClient) {
     _zkClient = zkClient;
     _configAccessor = new ConfigAccessor(zkClient);
     _usesExternalZkClient = true;
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
index bc84a1d..f08ba55 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
@@ -34,6 +34,7 @@ import org.apache.helix.api.exceptions.HelixMetaDataAccessException;
 import org.apache.helix.store.zk.ZNode;
 import org.apache.helix.util.HelixUtil;
 import org.apache.helix.zookeeper.api.client.HelixZkClient;
+import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.helix.zookeeper.impl.factory.DedicatedZkClientFactory;
 import org.apache.helix.zookeeper.impl.factory.SharedZkClientFactory;
@@ -102,14 +103,14 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
 
   private static Logger LOG = LoggerFactory.getLogger(ZkBaseDataAccessor.class);
 
-  private final HelixZkClient _zkClient;
+  private final RealmAwareZkClient _zkClient;
   // true if ZkBaseDataAccessor was instantiated with a HelixZkClient, false otherwise
   // This is used for close() to determine how ZkBaseDataAccessor should close the underlying
   // ZkClient
   private final boolean _usesExternalZkClient;
 
   @Deprecated
-  public ZkBaseDataAccessor(HelixZkClient zkClient) {
+  public ZkBaseDataAccessor(RealmAwareZkClient zkClient) {
     if (zkClient == null) {
       throw new NullPointerException("zkclient is null");
     }
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
index 881a8f6..453a2f1 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
@@ -44,12 +44,11 @@ import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.helix.cloud.azure.AzureConstants;
 import org.apache.helix.cloud.constants.CloudProvider;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.SystemPropertyKeys;
 import org.apache.helix.manager.zk.ZKHelixAdmin;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
-import org.apache.helix.manager.zk.ZNRecordSerializer;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.zookeeper.api.client.HelixZkClient;
-import org.apache.helix.zookeeper.impl.factory.SharedZkClientFactory;
 import org.apache.helix.model.BuiltInStateModelDefinitions;
 import org.apache.helix.model.CloudConfig;
 import org.apache.helix.model.ClusterConfig;
@@ -66,7 +65,14 @@ import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.StateModelDefinition;
 import org.apache.helix.model.builder.ConstraintItemBuilder;
 import org.apache.helix.model.builder.HelixConfigScopeBuilder;
+import org.apache.helix.msdcommon.exception.InvalidRoutingDataException;
 import org.apache.helix.util.HelixUtil;
+import org.apache.helix.zookeeper.api.client.HelixZkClient;
+import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer;
+import org.apache.helix.zookeeper.impl.client.FederatedZkClient;
+import org.apache.helix.zookeeper.impl.factory.SharedZkClientFactory;
 import org.apache.helix.zookeeper.zkclient.DataUpdater;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -139,37 +145,74 @@ public class ClusterSetup {
   public static final String removeConstraint = "removeConstraint";
 
   private static final Logger _logger = LoggerFactory.getLogger(ClusterSetup.class);
-  private final String _zkServerAddress;
-  private final HelixZkClient _zkClient;
-  // true if ZkBaseDataAccessor was instantiated with a HelixZkClient, false otherwise
+  private final RealmAwareZkClient _zkClient;
+  // true if ZkBaseDataAccessor was instantiated with a RealmAwareZkClient, false otherwise
   // This is used for close() to determine how ZkBaseDataAccessor should close the underlying
   // ZkClient
   private final boolean _usesExternalZkClient;
   private final HelixAdmin _admin;
 
+  @Deprecated
   public ClusterSetup(String zkServerAddress) {
-    _zkServerAddress = zkServerAddress;
-    _zkClient = SharedZkClientFactory.getInstance()
-        .buildZkClient(new HelixZkClient.ZkConnectionConfig(_zkServerAddress));
-    _zkClient.setZkSerializer(new ZNRecordSerializer());
+    // If the multi ZK config is enabled, use FederatedZkClient on multi-realm mode
+    if (Boolean.parseBoolean(System.getProperty(SystemPropertyKeys.MULTI_ZK_ENABLED))) {
+      try {
+        _zkClient = new FederatedZkClient(
+            new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder().build(),
+            new RealmAwareZkClient.RealmAwareZkClientConfig()
+                .setZkSerializer(new ZNRecordSerializer()));
+      } catch (IOException | InvalidRoutingDataException | IllegalStateException e) {
+        throw new HelixException("Failed to create ConfigAccessor!", e);
+      }
+    } else {
+      _zkClient = SharedZkClientFactory.getInstance()
+          .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkServerAddress));
+      _zkClient.setZkSerializer(new ZNRecordSerializer());
+    }
+
     _admin = new ZKHelixAdmin(_zkClient);
     _usesExternalZkClient = false;
   }
 
-  public ClusterSetup(HelixZkClient zkClient) {
-    _zkServerAddress = zkClient.getServers();
+  @Deprecated
+  public ClusterSetup(RealmAwareZkClient zkClient) {
     _zkClient = zkClient;
     _admin = new ZKHelixAdmin(_zkClient);
     _usesExternalZkClient = true;
   }
 
-  public ClusterSetup(HelixZkClient zkClient, HelixAdmin zkHelixAdmin) {
-    _zkServerAddress = zkClient.getServers();
+  @Deprecated
+  public ClusterSetup(RealmAwareZkClient zkClient, HelixAdmin zkHelixAdmin) {
     _zkClient = zkClient;
     _admin = zkHelixAdmin;
     _usesExternalZkClient = true;
   }
 
+  private ClusterSetup(Builder builder) {
+    switch (builder._realmMode) {
+      case MULTI_REALM:
+        try {
+          _zkClient = new FederatedZkClient(builder._realmAwareZkConnectionConfig,
+              builder._realmAwareZkClientConfig.setZkSerializer(new ZNRecordSerializer()));
+          break;
+        } catch (IOException | InvalidRoutingDataException | IllegalStateException e) {
+          throw new HelixException("Failed to create ClusterSetup!", e);
+        }
+      case SINGLE_REALM:
+        // Create a HelixZkClient: Use a SharedZkClient because ClusterSetup does not need to do
+        // ephemeral operations
+        _zkClient = SharedZkClientFactory.getInstance()
+            .buildZkClient(new HelixZkClient.ZkConnectionConfig(builder._zkAddress),
+                builder._realmAwareZkClientConfig.createHelixZkClientConfig()
+                    .setZkSerializer(new ZNRecordSerializer()));
+        break;
+      default:
+        throw new HelixException("Invalid RealmMode given: " + builder._realmMode);
+    }
+    _admin = new ZKHelixAdmin(_zkClient);
+    _usesExternalZkClient = false;
+  }
+
   /**
    * Closes any stateful resources in ClusterSetup.
    */
@@ -193,7 +236,7 @@ public class ClusterSetup {
       // If cloud is enabled and Cloud Provider is Azure, populated the Topology information in cluster config
       if (cloudConfig.isCloudEnabled()
           && cloudConfig.getCloudProvider().equals(CloudProvider.AZURE.name())) {
-        ConfigAccessor configAccessor = new ConfigAccessor(_zkServerAddress);
+        ConfigAccessor configAccessor = new ConfigAccessor(_zkClient);
         ClusterConfig clusterConfig = new ClusterConfig(clusterName);
         clusterConfig.setTopology(AzureConstants.AZURE_TOPOLOGY);
         clusterConfig.setTopologyAwareEnabled(true);
@@ -247,7 +290,7 @@ public class ClusterSetup {
   public void dropInstanceFromCluster(String clusterName, String instanceId) {
     ZKHelixDataAccessor accessor =
         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
-    Builder keyBuilder = accessor.keyBuilder();
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
 
     InstanceConfig instanceConfig = InstanceConfig.toInstanceConfig(instanceId);
     instanceId = instanceConfig.getInstanceName();
@@ -298,7 +341,7 @@ public class ClusterSetup {
 
     ZKHelixDataAccessor accessor =
         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
-    Builder keyBuilder = accessor.keyBuilder();
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
 
     // If new instance config is missing, new instance is not in good state and therefore
     // should not perform swap.
@@ -1592,4 +1635,70 @@ public class ClusterSetup {
     int ret = processCommandLineArgs(args);
     System.exit(ret);
   }
+
+  public static class Builder {
+    private String _zkAddress;
+    private RealmAwareZkClient.RealmMode _realmMode;
+    private RealmAwareZkClient.RealmAwareZkConnectionConfig _realmAwareZkConnectionConfig;
+    private RealmAwareZkClient.RealmAwareZkClientConfig _realmAwareZkClientConfig;
+
+    public Builder() {
+    }
+
+    public Builder setZkAddress(String zkAddress) {
+      _zkAddress = zkAddress;
+      return this;
+    }
+
+    public Builder setRealmMode(RealmAwareZkClient.RealmMode realmMode) {
+      _realmMode = realmMode;
+      return this;
+    }
+
+    public Builder setRealmAwareZkConnectionConfig(
+        RealmAwareZkClient.RealmAwareZkConnectionConfig realmAwareZkConnectionConfig) {
+      _realmAwareZkConnectionConfig = realmAwareZkConnectionConfig;
+      return this;
+    }
+
+    public Builder setRealmAwareZkClientConfig(
+        RealmAwareZkClient.RealmAwareZkClientConfig realmAwareZkClientConfig) {
+      _realmAwareZkClientConfig = realmAwareZkClientConfig;
+      return this;
+    }
+
+    public ClusterSetup build() {
+      validate();
+      return new ClusterSetup(this);
+    }
+
+    private void validate() {
+      // Resolve RealmMode based on other parameters
+      boolean isZkAddressSet = _zkAddress != null && !_zkAddress.isEmpty();
+      if (_realmMode == RealmAwareZkClient.RealmMode.SINGLE_REALM && !isZkAddressSet) {
+        throw new HelixException(
+            "ClusterSetup: RealmMode cannot be single-realm without a valid ZkAddress set!");
+      }
+      if (_realmMode == RealmAwareZkClient.RealmMode.MULTI_REALM && isZkAddressSet) {
+        throw new HelixException(
+            "ClusterSetup: You cannot set the ZkAddress on multi-realm mode!");
+      }
+      if (_realmMode == null) {
+        _realmMode = isZkAddressSet ? RealmAwareZkClient.RealmMode.SINGLE_REALM
+            : RealmAwareZkClient.RealmMode.MULTI_REALM;
+      }
+
+      // Resolve RealmAwareZkClientConfig
+      if (_realmAwareZkClientConfig == null) {
+        _realmAwareZkClientConfig = new RealmAwareZkClient.RealmAwareZkClientConfig();
+      }
+
+      // Resolve RealmAwareZkConnectionConfig
+      if (_realmAwareZkConnectionConfig == null) {
+        // If not set, create a default one
+        _realmAwareZkConnectionConfig =
+            new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder().build();
+      }
+    }
+  }
 }
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/RealmAwareZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/RealmAwareZkClient.java
index fb10073..40b6c54 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/RealmAwareZkClient.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/RealmAwareZkClient.java
@@ -59,7 +59,8 @@ public interface RealmAwareZkClient {
    * MULTI_REALM: CRUD and change subscription are supported. Operations involving EPHEMERAL CreateMode will throw an UnsupportedOperationException.
    */
   enum RealmMode {
-    SINGLE_REALM, MULTI_REALM
+    SINGLE_REALM,
+    MULTI_REALM
   }
 
   int DEFAULT_OPERATION_TIMEOUT = Integer.MAX_VALUE;
@@ -465,7 +466,7 @@ public interface RealmAwareZkClient {
     // Data access configs
     protected long _operationRetryTimeout = DEFAULT_OPERATION_TIMEOUT;
 
-    // Others
+    // Serializer
     protected PathBasedZkSerializer _zkSerializer;
 
     // Monitoring