You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by hu...@apache.org on 2020/04/01 22:47:30 UTC
[helix] 35/49: Make ClusterSetup realm-aware (#861)
This is an automated email from the ASF dual-hosted git repository.
hulee pushed a commit to branch zooscalability
in repository https://gitbox.apache.org/repos/asf/helix.git
commit dd4c3838ba0ad92cedf6f3a16132d17354843be8
Author: Hunter Lee <hu...@linkedin.com>
AuthorDate: Wed Mar 11 19:35:42 2020 -0700
Make ClusterSetup realm-aware (#861)
We make ClusterSetup, a Helix Java API, realm-aware so that this could be used in a multi-ZK environment.
Changelist:
Add a Builder to enable users to set internal ZkClient parameters
Add the realm-aware behavior in existing constructors
Update ConfigAccessor to reflect the change in the logic
---
.../java/org/apache/helix/SystemPropertyKeys.java | 6 +
.../main/java/org/apache/helix/ConfigAccessor.java | 66 ++++++----
.../java/org/apache/helix/SystemPropertyKeys.java | 62 ---------
.../org/apache/helix/manager/zk/ZKHelixAdmin.java | 5 +-
.../helix/manager/zk/ZkBaseDataAccessor.java | 5 +-
.../java/org/apache/helix/tools/ClusterSetup.java | 143 ++++++++++++++++++---
.../zookeeper/api/client/RealmAwareZkClient.java | 5 +-
7 files changed, 178 insertions(+), 114 deletions(-)
diff --git a/helix-common/src/main/java/org/apache/helix/SystemPropertyKeys.java b/helix-common/src/main/java/org/apache/helix/SystemPropertyKeys.java
index bcb8405..a40dbe9 100644
--- a/helix-common/src/main/java/org/apache/helix/SystemPropertyKeys.java
+++ b/helix-common/src/main/java/org/apache/helix/SystemPropertyKeys.java
@@ -26,6 +26,9 @@ public class SystemPropertyKeys {
// ZKHelixManager
public static final String CLUSTER_MANAGER_VERSION = "cluster-manager-version.properties";
+ // soft constraints weight definitions
+ public static final String SOFT_CONSTRAINT_WEIGHTS = "soft-constraint-weight.properties";
+
public static final String FLAPPING_TIME_WINDOW = "helixmanager.flappingTimeWindow";
// max disconnect count during the flapping time window to trigger HelixManager flapping handling
@@ -57,4 +60,7 @@ public class SystemPropertyKeys {
// MBean monitor for helix.
public static final String HELIX_MONITOR_TIME_WINDOW_LENGTH_MS = "helix.monitor.slidingTimeWindow.ms";
+
+ // Multi-ZK mode enable/disable flag
+ public static final String MULTI_ZK_ENABLED = "helix.multiZkEnabled";
}
diff --git a/helix-core/src/main/java/org/apache/helix/ConfigAccessor.java b/helix-core/src/main/java/org/apache/helix/ConfigAccessor.java
index b0c1add..d0b3bba 100644
--- a/helix-core/src/main/java/org/apache/helix/ConfigAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/ConfigAccessor.java
@@ -29,7 +29,6 @@ import java.util.Map;
import java.util.TreeMap;
import org.apache.helix.manager.zk.ZKUtil;
-import org.apache.helix.manager.zk.ZNRecordSerializer;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.ConfigScope;
import org.apache.helix.model.HelixConfigScope;
@@ -44,6 +43,7 @@ import org.apache.helix.util.StringTemplate;
import org.apache.helix.zookeeper.api.client.HelixZkClient;
import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer;
import org.apache.helix.zookeeper.impl.client.FederatedZkClient;
import org.apache.helix.zookeeper.impl.factory.SharedZkClientFactory;
import org.slf4j.Logger;
@@ -86,18 +86,23 @@ public class ConfigAccessor {
* Constructor that creates a realm-aware ConfigAccessor using a builder.
* @param builder
*/
- private ConfigAccessor(Builder builder) throws IOException, InvalidRoutingDataException {
+ private ConfigAccessor(Builder builder) {
switch (builder._realmMode) {
case MULTI_REALM:
- _zkClient = new FederatedZkClient(builder._realmAwareZkConnectionConfig,
- builder._realmAwareZkClientConfig);
+ try {
+ _zkClient = new FederatedZkClient(builder._realmAwareZkConnectionConfig,
+ builder._realmAwareZkClientConfig.setZkSerializer(new ZNRecordSerializer()));
+ } catch (IOException | InvalidRoutingDataException | IllegalStateException e) {
+ throw new HelixException("Failed to create ConfigAccessor!", e);
+ }
break;
case SINGLE_REALM:
// Create a HelixZkClient: Use a SharedZkClient because ConfigAccessor does not need to do
// ephemeral operations
_zkClient = SharedZkClientFactory.getInstance()
- .buildZkClient(builder._realmAwareZkConnectionConfig.createZkConnectionConfig(),
- builder._realmAwareZkClientConfig.createHelixZkClientConfig());
+ .buildZkClient(new HelixZkClient.ZkConnectionConfig(builder._zkAddress),
+ builder._realmAwareZkClientConfig.createHelixZkClientConfig()
+ .setZkSerializer(new ZNRecordSerializer()));
break;
default:
throw new HelixException("Invalid RealmMode given: " + builder._realmMode);
@@ -123,24 +128,26 @@ public class ConfigAccessor {
* ConfigAccessor only deals with Helix's data models like ResourceConfig.
* @param zkAddress
*/
+ @Deprecated
public ConfigAccessor(String zkAddress) {
- // First, attempt to connect on multi-realm mode using FederatedZkClient
- RealmAwareZkClient zkClient;
- try {
- zkClient = new FederatedZkClient(
- new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder().build(),
- new RealmAwareZkClient.RealmAwareZkClientConfig());
- } catch (IOException | InvalidRoutingDataException | IllegalStateException e) {
- // Connecting multi-realm failed - fall back to creating it on single-realm mode using the given ZK address
- LOG.info(
- "ConfigAccessor: not able to connect on multi-realm mode; connecting single-realm mode to ZK: {}",
- zkAddress, e);
- zkClient = SharedZkClientFactory.getInstance()
- .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddress),
- new HelixZkClient.ZkClientConfig().setZkSerializer(new ZNRecordSerializer()));
- }
- _zkClient = zkClient;
_usesExternalZkClient = false;
+
+ // If the multi ZK config is enabled, use FederatedZkClient on multi-realm mode
+ if (Boolean.parseBoolean(System.getProperty(SystemPropertyKeys.MULTI_ZK_ENABLED))) {
+ try {
+ _zkClient = new FederatedZkClient(
+ new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder().build(),
+ new RealmAwareZkClient.RealmAwareZkClientConfig()
+ .setZkSerializer(new ZNRecordSerializer()));
+ return;
+ } catch (IOException | InvalidRoutingDataException | IllegalStateException e) {
+ throw new HelixException("Failed to create ConfigAccessor!", e);
+ }
+ }
+
+ _zkClient = SharedZkClientFactory.getInstance()
+ .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddress),
+ new HelixZkClient.ZkClientConfig().setZkSerializer(new ZNRecordSerializer()));
}
/**
@@ -956,7 +963,7 @@ public class ConfigAccessor {
return this;
}
- public ConfigAccessor build() throws Exception {
+ public ConfigAccessor build() {
validate();
return new ConfigAccessor(this);
}
@@ -971,17 +978,20 @@ public class ConfigAccessor {
throw new HelixException(
"ConfigAccessor: RealmMode cannot be single-realm without a valid ZkAddress set!");
}
+ if (_realmMode == RealmAwareZkClient.RealmMode.MULTI_REALM && isZkAddressSet) {
+ throw new HelixException(
+ "ConfigAccessor: You cannot set the ZkAddress on multi-realm mode!");
+ }
+
if (_realmMode == null) {
_realmMode = isZkAddressSet ? RealmAwareZkClient.RealmMode.SINGLE_REALM
: RealmAwareZkClient.RealmMode.MULTI_REALM;
}
// Resolve RealmAwareZkClientConfig
- boolean isZkClientConfigSet = _realmAwareZkClientConfig != null;
- // Resolve which clientConfig to use
- _realmAwareZkClientConfig =
- isZkClientConfigSet ? _realmAwareZkClientConfig.createHelixZkClientConfig()
- : new HelixZkClient.ZkClientConfig().setZkSerializer(new ZNRecordSerializer());
+ if (_realmAwareZkClientConfig == null) {
+ _realmAwareZkClientConfig = new RealmAwareZkClient.RealmAwareZkClientConfig();
+ }
// Resolve RealmAwareZkConnectionConfig
if (_realmAwareZkConnectionConfig == null) {
diff --git a/helix-core/src/main/java/org/apache/helix/SystemPropertyKeys.java b/helix-core/src/main/java/org/apache/helix/SystemPropertyKeys.java
deleted file mode 100644
index 2d824cb..0000000
--- a/helix-core/src/main/java/org/apache/helix/SystemPropertyKeys.java
+++ /dev/null
@@ -1,62 +0,0 @@
-package org.apache.helix;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-public class SystemPropertyKeys {
- // Task Driver
- public static final String TASK_CONFIG_LIMITATION = "helixTask.configsLimitation";
-
- // ZKHelixManager
- public static final String CLUSTER_MANAGER_VERSION = "cluster-manager-version.properties";
- // soft constraints weight definitions
- public static final String SOFT_CONSTRAINT_WEIGHTS = "soft-constraint-weight.properties";
-
- public static final String FLAPPING_TIME_WINDOW = "helixmanager.flappingTimeWindow";
-
- // max disconnect count during the flapping time window to trigger HelixManager flapping handling
- public static final String MAX_DISCONNECT_THRESHOLD = "helixmanager.maxDisconnectThreshold";
-
- public static final String ZK_SESSION_TIMEOUT = "zk.session.timeout";
-
- public static final String ZK_CONNECTION_TIMEOUT = "zk.connection.timeout";
-
- @Deprecated
- public static final String ZK_REESTABLISHMENT_CONNECTION_TIMEOUT =
- "zk.connectionReEstablishment.timeout";
-
- public static final String ZK_WAIT_CONNECTED_TIMEOUT = "helixmanager.waitForConnectedTimeout";
-
- public static final String PARTICIPANT_HEALTH_REPORT_LATENCY =
- "helixmanager.participantHealthReport.reportLatency";
-
- // Indicate monitoring level of the HelixManager metrics
- public static final String MONITOR_LEVEL = "helixmanager.monitorLevel";
-
- // CallbackHandler
- public static final String ASYNC_BATCH_MODE_ENABLED = "helix.callbackhandler.isAsyncBatchModeEnabled";
-
- public static final String LEGACY_ASYNC_BATCH_MODE_ENABLED = "isAsyncBatchModeEnabled";
-
- // Controller
- public static final String CONTROLLER_MESSAGE_PURGE_DELAY = "helix.controller.stages.MessageGenerationPhase.messagePurgeDelay";
-
- // MBean monitor for helix.
- public static final String HELIX_MONITOR_TIME_WINDOW_LENGTH_MS = "helix.monitor.slidingTimeWindow.ms";
-}
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
index 77d8103..d7c40ff 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
@@ -78,6 +78,7 @@ import org.apache.helix.tools.DefaultIdealStateCalculator;
import org.apache.helix.util.HelixUtil;
import org.apache.helix.util.RebalanceUtil;
import org.apache.helix.zookeeper.api.client.HelixZkClient;
+import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.zookeeper.impl.factory.SharedZkClientFactory;
import org.apache.helix.zookeeper.zkclient.DataUpdater;
@@ -93,7 +94,7 @@ public class ZKHelixAdmin implements HelixAdmin {
private static final String MAINTENANCE_ZNODE_ID = "maintenance";
private static final int DEFAULT_SUPERCLUSTER_REPLICA = 3;
- private final HelixZkClient _zkClient;
+ private final RealmAwareZkClient _zkClient;
private final ConfigAccessor _configAccessor;
// true if ZKHelixAdmin was instantiated with a HelixZkClient, false otherwise
// This is used for close() to determine how ZKHelixAdmin should close the underlying ZkClient
@@ -102,7 +103,7 @@ public class ZKHelixAdmin implements HelixAdmin {
private static Logger logger = LoggerFactory.getLogger(ZKHelixAdmin.class);
@Deprecated
- public ZKHelixAdmin(HelixZkClient zkClient) {
+ public ZKHelixAdmin(RealmAwareZkClient zkClient) {
_zkClient = zkClient;
_configAccessor = new ConfigAccessor(zkClient);
_usesExternalZkClient = true;
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
index bc84a1d..f08ba55 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
@@ -34,6 +34,7 @@ import org.apache.helix.api.exceptions.HelixMetaDataAccessException;
import org.apache.helix.store.zk.ZNode;
import org.apache.helix.util.HelixUtil;
import org.apache.helix.zookeeper.api.client.HelixZkClient;
+import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.zookeeper.impl.factory.DedicatedZkClientFactory;
import org.apache.helix.zookeeper.impl.factory.SharedZkClientFactory;
@@ -102,14 +103,14 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
private static Logger LOG = LoggerFactory.getLogger(ZkBaseDataAccessor.class);
- private final HelixZkClient _zkClient;
+ private final RealmAwareZkClient _zkClient;
// true if ZkBaseDataAccessor was instantiated with a HelixZkClient, false otherwise
// This is used for close() to determine how ZkBaseDataAccessor should close the underlying
// ZkClient
private final boolean _usesExternalZkClient;
@Deprecated
- public ZkBaseDataAccessor(HelixZkClient zkClient) {
+ public ZkBaseDataAccessor(RealmAwareZkClient zkClient) {
if (zkClient == null) {
throw new NullPointerException("zkclient is null");
}
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
index 7ac20a1..4fcfc97 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
@@ -39,14 +39,11 @@ import org.apache.commons.cli.ParseException;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixConstants;
import org.apache.helix.HelixException;
-import org.apache.helix.PropertyKey.Builder;
-import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.SystemPropertyKeys;
import org.apache.helix.manager.zk.ZKHelixAdmin;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
-import org.apache.helix.manager.zk.ZNRecordSerializer;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.zookeeper.api.client.HelixZkClient;
-import org.apache.helix.zookeeper.impl.factory.SharedZkClientFactory;
import org.apache.helix.model.BuiltInStateModelDefinitions;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.ClusterConstraints;
@@ -62,7 +59,14 @@ import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.StateModelDefinition;
import org.apache.helix.model.builder.ConstraintItemBuilder;
import org.apache.helix.model.builder.HelixConfigScopeBuilder;
+import org.apache.helix.msdcommon.exception.InvalidRoutingDataException;
import org.apache.helix.util.HelixUtil;
+import org.apache.helix.zookeeper.api.client.HelixZkClient;
+import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer;
+import org.apache.helix.zookeeper.impl.client.FederatedZkClient;
+import org.apache.helix.zookeeper.impl.factory.SharedZkClientFactory;
import org.apache.helix.zookeeper.zkclient.DataUpdater;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -135,37 +139,74 @@ public class ClusterSetup {
public static final String removeConstraint = "removeConstraint";
private static final Logger _logger = LoggerFactory.getLogger(ClusterSetup.class);
- private final String _zkServerAddress;
- private final HelixZkClient _zkClient;
- // true if ZkBaseDataAccessor was instantiated with a HelixZkClient, false otherwise
+ private final RealmAwareZkClient _zkClient;
+ // true if ZkBaseDataAccessor was instantiated with a RealmAwareZkClient, false otherwise
// This is used for close() to determine how ZkBaseDataAccessor should close the underlying
// ZkClient
private final boolean _usesExternalZkClient;
private final HelixAdmin _admin;
+ @Deprecated
public ClusterSetup(String zkServerAddress) {
- _zkServerAddress = zkServerAddress;
- _zkClient = SharedZkClientFactory.getInstance()
- .buildZkClient(new HelixZkClient.ZkConnectionConfig(_zkServerAddress));
- _zkClient.setZkSerializer(new ZNRecordSerializer());
+ // If the multi ZK config is enabled, use FederatedZkClient on multi-realm mode
+ if (Boolean.parseBoolean(System.getProperty(SystemPropertyKeys.MULTI_ZK_ENABLED))) {
+ try {
+ _zkClient = new FederatedZkClient(
+ new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder().build(),
+ new RealmAwareZkClient.RealmAwareZkClientConfig()
+ .setZkSerializer(new ZNRecordSerializer()));
+ } catch (IOException | InvalidRoutingDataException | IllegalStateException e) {
+ throw new HelixException("Failed to create ConfigAccessor!", e);
+ }
+ } else {
+ _zkClient = SharedZkClientFactory.getInstance()
+ .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkServerAddress));
+ _zkClient.setZkSerializer(new ZNRecordSerializer());
+ }
+
_admin = new ZKHelixAdmin(_zkClient);
_usesExternalZkClient = false;
}
- public ClusterSetup(HelixZkClient zkClient) {
- _zkServerAddress = zkClient.getServers();
+ @Deprecated
+ public ClusterSetup(RealmAwareZkClient zkClient) {
_zkClient = zkClient;
_admin = new ZKHelixAdmin(_zkClient);
_usesExternalZkClient = true;
}
- public ClusterSetup(HelixZkClient zkClient, HelixAdmin zkHelixAdmin) {
- _zkServerAddress = zkClient.getServers();
+ @Deprecated
+ public ClusterSetup(RealmAwareZkClient zkClient, HelixAdmin zkHelixAdmin) {
_zkClient = zkClient;
_admin = zkHelixAdmin;
_usesExternalZkClient = true;
}
+ private ClusterSetup(Builder builder) {
+ switch (builder._realmMode) {
+ case MULTI_REALM:
+ try {
+ _zkClient = new FederatedZkClient(builder._realmAwareZkConnectionConfig,
+ builder._realmAwareZkClientConfig.setZkSerializer(new ZNRecordSerializer()));
+ break;
+ } catch (IOException | InvalidRoutingDataException | IllegalStateException e) {
+ throw new HelixException("Failed to create ClusterSetup!", e);
+ }
+ case SINGLE_REALM:
+ // Create a HelixZkClient: Use a SharedZkClient because ClusterSetup does not need to do
+ // ephemeral operations
+ _zkClient = SharedZkClientFactory.getInstance()
+ .buildZkClient(new HelixZkClient.ZkConnectionConfig(builder._zkAddress),
+ builder._realmAwareZkClientConfig.createHelixZkClientConfig()
+ .setZkSerializer(new ZNRecordSerializer()));
+ break;
+ default:
+ throw new HelixException("Invalid RealmMode given: " + builder._realmMode);
+ }
+ _admin = new ZKHelixAdmin(_zkClient);
+ _usesExternalZkClient = false;
+ }
+
/**
* Closes any stateful resources in ClusterSetup.
*/
@@ -225,7 +266,7 @@ public class ClusterSetup {
public void dropInstanceFromCluster(String clusterName, String instanceId) {
ZKHelixDataAccessor accessor =
new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
- Builder keyBuilder = accessor.keyBuilder();
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
InstanceConfig instanceConfig = InstanceConfig.toInstanceConfig(instanceId);
instanceId = instanceConfig.getInstanceName();
@@ -276,7 +317,7 @@ public class ClusterSetup {
ZKHelixDataAccessor accessor =
new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
- Builder keyBuilder = accessor.keyBuilder();
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
// If new instance config is missing, new instance is not in good state and therefore
// should not perform swap.
@@ -1570,4 +1611,70 @@ public class ClusterSetup {
int ret = processCommandLineArgs(args);
System.exit(ret);
}
+
+ public static class Builder {
+ private String _zkAddress;
+ private RealmAwareZkClient.RealmMode _realmMode;
+ private RealmAwareZkClient.RealmAwareZkConnectionConfig _realmAwareZkConnectionConfig;
+ private RealmAwareZkClient.RealmAwareZkClientConfig _realmAwareZkClientConfig;
+
+ public Builder() {
+ }
+
+ public Builder setZkAddress(String zkAddress) {
+ _zkAddress = zkAddress;
+ return this;
+ }
+
+ public Builder setRealmMode(RealmAwareZkClient.RealmMode realmMode) {
+ _realmMode = realmMode;
+ return this;
+ }
+
+ public Builder setRealmAwareZkConnectionConfig(
+ RealmAwareZkClient.RealmAwareZkConnectionConfig realmAwareZkConnectionConfig) {
+ _realmAwareZkConnectionConfig = realmAwareZkConnectionConfig;
+ return this;
+ }
+
+ public Builder setRealmAwareZkClientConfig(
+ RealmAwareZkClient.RealmAwareZkClientConfig realmAwareZkClientConfig) {
+ _realmAwareZkClientConfig = realmAwareZkClientConfig;
+ return this;
+ }
+
+ public ClusterSetup build() {
+ validate();
+ return new ClusterSetup(this);
+ }
+
+ private void validate() {
+ // Resolve RealmMode based on other parameters
+ boolean isZkAddressSet = _zkAddress != null && !_zkAddress.isEmpty();
+ if (_realmMode == RealmAwareZkClient.RealmMode.SINGLE_REALM && !isZkAddressSet) {
+ throw new HelixException(
+ "ClusterSetup: RealmMode cannot be single-realm without a valid ZkAddress set!");
+ }
+ if (_realmMode == RealmAwareZkClient.RealmMode.MULTI_REALM && isZkAddressSet) {
+ throw new HelixException(
+ "ClusterSetup: You cannot set the ZkAddress on multi-realm mode!");
+ }
+ if (_realmMode == null) {
+ _realmMode = isZkAddressSet ? RealmAwareZkClient.RealmMode.SINGLE_REALM
+ : RealmAwareZkClient.RealmMode.MULTI_REALM;
+ }
+
+ // Resolve RealmAwareZkClientConfig
+ if (_realmAwareZkClientConfig == null) {
+ _realmAwareZkClientConfig = new RealmAwareZkClient.RealmAwareZkClientConfig();
+ }
+
+ // Resolve RealmAwareZkConnectionConfig
+ if (_realmAwareZkConnectionConfig == null) {
+ // If not set, create a default one
+ _realmAwareZkConnectionConfig =
+ new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder().build();
+ }
+ }
+ }
}
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/RealmAwareZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/RealmAwareZkClient.java
index fb10073..40b6c54 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/RealmAwareZkClient.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/api/client/RealmAwareZkClient.java
@@ -59,7 +59,8 @@ public interface RealmAwareZkClient {
* MULTI_REALM: CRUD and change subscription are supported. Operations involving EPHEMERAL CreateMode will throw an UnsupportedOperationException.
*/
enum RealmMode {
- SINGLE_REALM, MULTI_REALM
+ SINGLE_REALM,
+ MULTI_REALM
}
int DEFAULT_OPERATION_TIMEOUT = Integer.MAX_VALUE;
@@ -465,7 +466,7 @@ public interface RealmAwareZkClient {
// Data access configs
protected long _operationRetryTimeout = DEFAULT_OPERATION_TIMEOUT;
- // Others
+ // Serializer
protected PathBasedZkSerializer _zkSerializer;
// Monitoring