You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by hu...@apache.org on 2020/08/13 22:31:31 UTC
[helix] branch master updated: Add HelixManager constructor with
RealmAwareZkConnectionConfig (#1183)
This is an automated email from the ASF dual-hosted git repository.
hulee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new eeb738d Add HelixManager constructor with RealmAwareZkConnectionConfig (#1183)
eeb738d is described below
commit eeb738dbd8090516bf81b783b41b770774f92a76
Author: Hunter Lee <hu...@linkedin.com>
AuthorDate: Thu Aug 13 15:31:24 2020 -0700
Add HelixManager constructor with RealmAwareZkConnectionConfig (#1183)
Previously, there wasn't a way to use HelixManager with a custom routing data ZK connection config. This new way of constructing HelixManager allows users to use HelixManager with a custom RealmAwareZkConnectionConfig.
This PR also adds a null check for zkAddress for existing Helix APIs so that it will know when to start themselves in a multi-zk mode.
---
.../main/java/org/apache/helix/ConfigAccessor.java | 2 +-
.../java/org/apache/helix/HelixManagerFactory.java | 24 ++++--
.../org/apache/helix/HelixManagerProperty.java | 80 +++++++++++++++---
.../org/apache/helix/HelixPropertyFactory.java | 5 +-
.../org/apache/helix/manager/zk/ZKHelixAdmin.java | 6 +-
.../apache/helix/manager/zk/ZKHelixManager.java | 95 +++++++++++++++++-----
.../java/org/apache/helix/manager/zk/ZKUtil.java | 6 +-
.../helix/manager/zk/ZkBaseDataAccessor.java | 2 +-
.../helix/manager/zk/ZkBucketDataAccessor.java | 5 +-
.../java/org/apache/helix/tools/ClusterSetup.java | 2 +-
.../ClusterVerifiers/ZkHelixClusterVerifier.java | 4 +-
.../multizk/TestMultiZkHelixJavaApis.java | 68 ++++++++++++++--
12 files changed, 238 insertions(+), 61 deletions(-)
diff --git a/helix-core/src/main/java/org/apache/helix/ConfigAccessor.java b/helix-core/src/main/java/org/apache/helix/ConfigAccessor.java
index bc271c5..48bcfd4 100644
--- a/helix-core/src/main/java/org/apache/helix/ConfigAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/ConfigAccessor.java
@@ -113,7 +113,7 @@ public class ConfigAccessor {
_usesExternalZkClient = false;
// If the multi ZK config is enabled, use FederatedZkClient on multi-realm mode
- if (Boolean.parseBoolean(System.getProperty(SystemPropertyKeys.MULTI_ZK_ENABLED))) {
+ if (Boolean.getBoolean(SystemPropertyKeys.MULTI_ZK_ENABLED) || zkAddress == null) {
try {
_zkClient = new FederatedZkClient(
new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder().build(),
diff --git a/helix-core/src/main/java/org/apache/helix/HelixManagerFactory.java b/helix-core/src/main/java/org/apache/helix/HelixManagerFactory.java
index 2933808..7150928 100644
--- a/helix-core/src/main/java/org/apache/helix/HelixManagerFactory.java
+++ b/helix-core/src/main/java/org/apache/helix/HelixManagerFactory.java
@@ -19,19 +19,15 @@ package org.apache.helix;
* under the License.
*/
-/**
- * factory that creates cluster managers
- *
- * for zk-based cluster managers, the getZKXXX(..zkClient) that takes a zkClient parameter
- * are intended for session expiry test purpose
- */
import org.apache.helix.manager.zk.HelixManagerStateListener;
import org.apache.helix.manager.zk.ZKHelixManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
/**
* Obtain one of a set of Helix cluster managers, organized by the backing system.
+ * factory that creates cluster managers.
*/
public final class HelixManagerFactory {
private static final Logger LOG = LoggerFactory.getLogger(HelixManagerFactory.class);
@@ -65,4 +61,20 @@ public final class HelixManagerFactory {
return new ZKHelixManager(clusterName, instanceName, type, zkAddr, stateListener);
}
+ /**
+ * Construct a ZkHelixManager using the HelixManagerProperty instance given.
+ * HelixManagerProperty given must contain a valid ZkConnectionConfig.
+ * @param clusterName
+ * @param instanceName
+ * @param type
+ * @param stateListener
+ * @param helixManagerProperty must contain a valid ZkConnectionConfig
+ * @return
+ */
+ public static HelixManager getZKHelixManager(String clusterName, String instanceName,
+ InstanceType type, HelixManagerStateListener stateListener,
+ HelixManagerProperty helixManagerProperty) {
+ return new ZKHelixManager(clusterName, instanceName, type, null, stateListener,
+ helixManagerProperty);
+ }
}
diff --git a/helix-core/src/main/java/org/apache/helix/HelixManagerProperty.java b/helix-core/src/main/java/org/apache/helix/HelixManagerProperty.java
index 2ec26a7..f566b99 100644
--- a/helix-core/src/main/java/org/apache/helix/HelixManagerProperty.java
+++ b/helix-core/src/main/java/org/apache/helix/HelixManagerProperty.java
@@ -20,32 +20,51 @@ package org.apache.helix;
*/
import java.util.Properties;
+
import org.apache.helix.model.CloudConfig;
+import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
/**
- * Hold Helix manager properties. The manager properties further hold Helix cloud properties
- * and some other properties specific for the manager.
+ * HelixManagerProperty is a general property/config object used for HelixManager creation.
*/
public class HelixManagerProperty {
private static final Logger LOG = LoggerFactory.getLogger(HelixManagerProperty.class.getName());
private String _version;
private long _healthReportLatency;
private HelixCloudProperty _helixCloudProperty;
+ private RealmAwareZkClient.RealmAwareZkConnectionConfig _zkConnectionConfig;
+ private RealmAwareZkClient.RealmAwareZkClientConfig _zkClientConfig;
/**
+ * ** Deprecated - HelixManagerProperty should be a general property/config object used for
+ * HelixManager creation, not tied only to Properties or CloudConfig **
+ *
* Initialize Helix manager property with default value
* @param helixManagerProperties helix manager related properties input as a map
* @param cloudConfig cloudConfig read from Zookeeper
*/
+ @Deprecated
public HelixManagerProperty(Properties helixManagerProperties, CloudConfig cloudConfig) {
_helixCloudProperty = new HelixCloudProperty(cloudConfig);
- setVersion(helixManagerProperties.getProperty(SystemPropertyKeys.HELIX_MANAGER_VERSION));
- setHealthReportLatency(
+ _version = helixManagerProperties.getProperty(SystemPropertyKeys.HELIX_MANAGER_VERSION);
+ _healthReportLatency = Long.parseLong(
helixManagerProperties.getProperty(SystemPropertyKeys.PARTICIPANT_HEALTH_REPORT_LATENCY));
}
+ private HelixManagerProperty(String version, long healthReportLatency,
+ HelixCloudProperty helixCloudProperty,
+ RealmAwareZkClient.RealmAwareZkConnectionConfig zkConnectionConfig,
+ RealmAwareZkClient.RealmAwareZkClientConfig zkClientConfig) {
+ _version = version;
+ _healthReportLatency = healthReportLatency;
+ _helixCloudProperty = helixCloudProperty;
+ _zkConnectionConfig = zkConnectionConfig;
+ _zkClientConfig = zkClientConfig;
+ }
+
public HelixCloudProperty getHelixCloudProperty() {
return _helixCloudProperty;
}
@@ -58,17 +77,54 @@ public class HelixManagerProperty {
return _healthReportLatency;
}
- public void setHelixCloudProperty(HelixCloudProperty helixCloudProperty) {
- _helixCloudProperty = helixCloudProperty;
+ public RealmAwareZkClient.RealmAwareZkConnectionConfig getZkConnectionConfig() {
+ return _zkConnectionConfig;
}
- public void setVersion(String version) {
- _version = version;
+ public RealmAwareZkClient.RealmAwareZkClientConfig getZkClientConfig() {
+ return _zkClientConfig;
}
- public void setHealthReportLatency(String latency) {
- _healthReportLatency = Long.valueOf(latency);
- }
+ public static class Builder {
+ private String _version;
+ private long _healthReportLatency;
+ private HelixCloudProperty _helixCloudProperty;
+ private RealmAwareZkClient.RealmAwareZkConnectionConfig _zkConnectionConfig;
+ private RealmAwareZkClient.RealmAwareZkClientConfig _zkClientConfig;
+
+ public Builder() {
+ }
+
+ public HelixManagerProperty build() {
+ return new HelixManagerProperty(_version, _healthReportLatency, _helixCloudProperty,
+ _zkConnectionConfig, _zkClientConfig);
+ }
+
+ public Builder setVersion(String version) {
+ _version = version;
+ return this;
+ }
- // TODO: migrate all other participant related properties to this file.
+ public Builder setHealthReportLatency(long healthReportLatency) {
+ _healthReportLatency = healthReportLatency;
+ return this;
+ }
+
+ public Builder setHelixCloudProperty(HelixCloudProperty helixCloudProperty) {
+ _helixCloudProperty = helixCloudProperty;
+ return this;
+ }
+
+ public Builder setRealmAWareZkConnectionConfig(
+ RealmAwareZkClient.RealmAwareZkConnectionConfig zkConnectionConfig) {
+ _zkConnectionConfig = zkConnectionConfig;
+ return this;
+ }
+
+ public Builder setRealmAwareZkClientConfig(
+ RealmAwareZkClient.RealmAwareZkClientConfig zkClientConfig) {
+ _zkClientConfig = zkClientConfig;
+ return this;
+ }
+ }
}
diff --git a/helix-core/src/main/java/org/apache/helix/HelixPropertyFactory.java b/helix-core/src/main/java/org/apache/helix/HelixPropertyFactory.java
index cbdc643..fa01d79 100644
--- a/helix-core/src/main/java/org/apache/helix/HelixPropertyFactory.java
+++ b/helix-core/src/main/java/org/apache/helix/HelixPropertyFactory.java
@@ -88,8 +88,9 @@ public final class HelixPropertyFactory {
CloudConfig cloudConfig;
RealmAwareZkClient dedicatedZkClient = null;
try {
- if (Boolean.getBoolean(SystemPropertyKeys.MULTI_ZK_ENABLED)) {
- // If the multi ZK config is enabled, use multi-realm mode with DedicatedZkClient
+ if (Boolean.getBoolean(SystemPropertyKeys.MULTI_ZK_ENABLED) || zkAddress == null) {
+ // If the multi ZK config is enabled or zkAddress is null, use realm-aware mode with
+ // DedicatedZkClient
try {
RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfig =
new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder()
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
index 6562e8f..0e123b2 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
@@ -127,8 +127,8 @@ public class ZKHelixAdmin implements HelixAdmin {
/**
* There are 2 realm-aware modes to connect to ZK:
- * 1. if system property {@link SystemPropertyKeys#MULTI_ZK_ENABLED} is set to <code>"true"</code>,
- * it will connect on multi-realm mode;
+ * 1. if system property {@link SystemPropertyKeys#MULTI_ZK_ENABLED} is set to <code>"true"</code>
+ * , or zkAddress is null, it will connect on multi-realm mode;
* 2. otherwise, it will connect on single-realm mode to the <code>zkAddress</code> provided.
*
* @param zkAddress ZK address
@@ -146,7 +146,7 @@ public class ZKHelixAdmin implements HelixAdmin {
RealmAwareZkClient zkClient;
- if (Boolean.getBoolean(SystemPropertyKeys.MULTI_ZK_ENABLED)) {
+ if (Boolean.getBoolean(SystemPropertyKeys.MULTI_ZK_ENABLED) || zkAddress == null) {
try {
zkClient = new FederatedZkClient(
new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder().build(), clientConfig);
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
index 18ccbe7..f8967bc 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
@@ -131,6 +131,7 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
private int _reportLatency;
protected RealmAwareZkClient _zkclient;
+ private RealmAwareZkClient.RealmAwareZkConnectionConfig _realmAwareZkConnectionConfig;
private final DefaultMessagingService _messagingService;
private Map<ChangeType, HelixCallbackMonitor> _callbackMonitors;
@@ -224,13 +225,14 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
public ZKHelixManager(String clusterName, String instanceName, InstanceType instanceType,
String zkAddress, HelixManagerStateListener stateListener,
HelixManagerProperty helixManagerProperty) {
-
- LOG.info("Create a zk-based cluster manager. zkSvr: " + zkAddress + ", clusterName: "
- + clusterName + ", instanceName: " + instanceName + ", type: " + instanceType);
+ validateZkConnectionSettings(zkAddress, helixManagerProperty);
_zkAddress = zkAddress;
_clusterName = clusterName;
_instanceType = instanceType;
+ LOG.info("Create a zk-based cluster manager. ZK connection: " + getZkConnectionInfo()
+ + ", clusterName: " + clusterName + ", instanceName: " + instanceName + ", type: "
+ + instanceType);
if (instanceName == null) {
try {
@@ -318,13 +320,11 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
_stateMachineEngine = null;
_participantHealthInfoCollector = null;
_controllerTimerTasks.add(new StatusDumpTask(this));
-
break;
case CONTROLLER_PARTICIPANT:
_stateMachineEngine = new HelixStateMachineEngine(this);
_participantHealthInfoCollector =
new ParticipantHealthReportCollectorImpl(this, _instanceName);
-
_timerTasks
.add(new ParticipantHealthReportTask(_participantHealthInfoCollector, _reportLatency));
_controllerTimerTasks.add(new StatusDumpTask(this));
@@ -348,7 +348,8 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
_enabledPipelineTypes = types;
}
- @Override public boolean removeListener(PropertyKey key, Object listener) {
+ @Override
+ public boolean removeListener(PropertyKey key, Object listener) {
LOG.info("Removing listener: " + listener + " on path: " + key.getPath() + " from cluster: "
+ _clusterName + " by instance: " + _instanceName);
@@ -390,15 +391,14 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
boolean isConnected = isConnected();
if (!isConnected && timeout > 0) {
- LOG.warn(
- "zkClient to " + _zkAddress + " is not connected, wait for " + _waitForConnectedTimeout
- + "ms.");
+ LOG.warn("zkClient to " + getZkConnectionInfo() + " is not connected, wait for "
+ + _waitForConnectedTimeout + "ms.");
isConnected = _zkclient.waitUntilConnected(_waitForConnectedTimeout, TimeUnit.MILLISECONDS);
}
if (!isConnected) {
LOG.error("zkClient is not connected after waiting " + timeout + "ms."
- + ", clusterName: " + _clusterName + ", zkAddress: " + _zkAddress);
+ + ", clusterName: " + _clusterName + ", zkAddress: " + getZkConnectionInfo());
throw new HelixException(
"HelixManager is not connected within retry timeout for cluster " + _clusterName);
}
@@ -671,10 +671,13 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
}
/**
+ * Deprecated becuase ZKHelixManager shouldn't expose ZkAddress in realm-aware mode.
+ *
* Returns a string that can be used to connect to metadata store for this HelixManager instance
* i.e. for ZkHelixManager, this will have format "{zookeeper-address}:{port}"
* @return a string used to connect to metadata store
*/
+ @Deprecated
@Override
public String getMetadataStoreConnectionString() {
return _zkAddress;
@@ -1091,7 +1094,7 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
isConnected =
_zkclient.waitUntilConnected(HelixZkClient.DEFAULT_CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS);
if (!isConnected) {
- LOG.error("fail to connect zkserver: " + _zkAddress + " in "
+ LOG.error("fail to connect zkserver: " + getZkConnectionInfo() + " in "
+ HelixZkClient.DEFAULT_CONNECTION_TIMEOUT + "ms. expiredSessionId: " + _sessionId
+ ", clusterName: " + _clusterName);
continue;
@@ -1405,11 +1408,15 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
PathBasedZkSerializer zkSerializer =
ChainedPathZkSerializer.builder(new ZNRecordSerializer()).build();
- RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfig =
- new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder()
- .setRealmMode(RealmAwareZkClient.RealmMode.SINGLE_REALM)
- .setZkRealmShardingKey(shardingKey)
- .setSessionTimeout(_sessionTimeout).build();
+ // If the user supplied RealmAwareZkConnectionConfig, then use it. Only create the connection
+ // config if nothing is given
+ if (_realmAwareZkConnectionConfig == null) {
+ // If no connection config is given, use the single realm mode with the cluster name as the
+ // key
+ _realmAwareZkConnectionConfig = new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder()
+ .setRealmMode(RealmAwareZkClient.RealmMode.SINGLE_REALM)
+ .setZkRealmShardingKey(shardingKey).setSessionTimeout(_sessionTimeout).build();
+ }
RealmAwareZkClient.RealmAwareZkClientConfig clientConfig =
new RealmAwareZkClient.RealmAwareZkClientConfig();
@@ -1422,27 +1429,27 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
.setMonitorRootPathOnly(isMonitorRootPathOnly());
if (_instanceType == InstanceType.ADMINISTRATOR) {
- return resolveZkClient(SharedZkClientFactory.getInstance(), connectionConfig,
+ return resolveZkClient(SharedZkClientFactory.getInstance(), _realmAwareZkConnectionConfig,
clientConfig);
}
- return resolveZkClient(DedicatedZkClientFactory.getInstance(), connectionConfig,
+ return resolveZkClient(DedicatedZkClientFactory.getInstance(), _realmAwareZkConnectionConfig,
clientConfig);
}
/*
* Resolves what type of ZkClient this HelixManager should use based on whether MULTI_ZK_ENABLED
* System config is set or not. Two types of ZkClients are available:
- * 1) If MULTI_ZK_ENABLED is set to true, we create a dedicated RealmAwareZkClient
- * that provides full ZkClient functionalities and connects to the correct ZK by querying
- * MetadataStoreDirectoryService.
+ * 1) If MULTI_ZK_ENABLED is set to true or zkAddress is null, we create a dedicated
+ * RealmAwareZkClient that provides full ZkClient functionalities and connects to the correct ZK
+ * by querying MetadataStoreDirectoryService.
* 2) Otherwise, we create a dedicated HelixZkClient which plainly connects to
* the ZK address given.
*/
private RealmAwareZkClient resolveZkClient(HelixZkClientFactory zkClientFactory,
RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfig,
RealmAwareZkClient.RealmAwareZkClientConfig clientConfig) {
- if (Boolean.getBoolean(SystemPropertyKeys.MULTI_ZK_ENABLED)) {
+ if (Boolean.getBoolean(SystemPropertyKeys.MULTI_ZK_ENABLED) || _zkAddress == null) {
try {
// Create realm-aware ZkClient.
return zkClientFactory.buildZkClient(connectionConfig, clientConfig);
@@ -1464,4 +1471,48 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
private String buildShardingKey() {
return _clusterName.charAt(0) == '/' ? _clusterName : "/" + _clusterName;
}
+
+ /**
+ * Check that not both zkAddress and ZkConnectionConfig are set.
+ * If zkAddress is not given and ZkConnectionConfig is given, check that ZkConnectionConfig has
+ * a ZK path sharding key set because HelixManager must work on single-realm mode.
+ * @param zkAddress
+ * @param helixManagerProperty
+ */
+ private void validateZkConnectionSettings(String zkAddress,
+ HelixManagerProperty helixManagerProperty) {
+ if (helixManagerProperty != null && helixManagerProperty.getZkConnectionConfig() != null) {
+ if (zkAddress != null) {
+ throw new HelixException(
+ "ZKHelixManager: cannot have both ZkAddress and ZkConnectionConfig set!");
+ }
+ RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfig =
+ helixManagerProperty.getZkConnectionConfig();
+ if (connectionConfig.getZkRealmShardingKey() == null || connectionConfig
+ .getZkRealmShardingKey().isEmpty()) {
+ throw new HelixException(
+ "ZKHelixManager::ZK path sharding key must be set for ZKHelixManager! ZKHelixManager "
+ + "is only available on single-realm mode.");
+ }
+ _realmAwareZkConnectionConfig = connectionConfig;
+ }
+ }
+
+ /**
+ * Resolve ZK connection info for logging purposes.
+ * @return
+ */
+ private String getZkConnectionInfo() {
+ String zkConnectionInfo;
+ if (_zkAddress == null) {
+ if (_helixManagerProperty != null && _helixManagerProperty.getZkConnectionConfig() != null) {
+ zkConnectionInfo = _helixManagerProperty.getZkConnectionConfig().toString();
+ } else {
+ zkConnectionInfo = "None";
+ }
+ } else {
+ zkConnectionInfo = _zkAddress;
+ }
+ return zkConnectionInfo;
+ }
}
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKUtil.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKUtil.java
index 61fcba2..cda5f39 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKUtil.java
@@ -614,7 +614,7 @@ public final class ZKUtil {
* @return
*/
private static RealmAwareZkClient getHelixZkClient(String zkAddr) {
- if (Boolean.getBoolean(SystemPropertyKeys.MULTI_ZK_ENABLED)) {
+ if (Boolean.getBoolean(SystemPropertyKeys.MULTI_ZK_ENABLED) || zkAddr == null) {
try {
// Create realm-aware ZkClient.
RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfig =
@@ -626,8 +626,8 @@ public final class ZKUtil {
throw new HelixException("Not able to connect on realm-aware mode", e);
}
}
- if (zkAddr == null || zkAddr.isEmpty()) {
- throw new HelixException("ZK Address given is either null or empty!");
+ if (zkAddr.isEmpty()) {
+ throw new HelixException("ZK Address given is empty!");
}
HelixZkClient.ZkClientConfig clientConfig = new HelixZkClient.ZkClientConfig();
clientConfig.setZkSerializer(new ZNRecordSerializer());
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
index f31bcc8..d2096b0 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
@@ -1337,7 +1337,7 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
static RealmAwareZkClient buildRealmAwareZkClientWithDefaultConfigs(
RealmAwareZkClient.RealmAwareZkClientConfig clientConfig, String zkAddress,
ZkClientType zkClientType) {
- if (Boolean.getBoolean(SystemPropertyKeys.MULTI_ZK_ENABLED)) {
+ if (Boolean.getBoolean(SystemPropertyKeys.MULTI_ZK_ENABLED) || zkAddress == null) {
// If the multi ZK config is enabled, use multi-realm mode with FederatedZkClient
try {
return new FederatedZkClient(
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java
index 51d96e9..8f0aa16 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java
@@ -83,7 +83,10 @@ public class ZkBucketDataAccessor implements BucketDataAccessor, AutoCloseable {
* @param versionTTLms in ms
*/
public ZkBucketDataAccessor(String zkAddr, int bucketSize, long versionTTLms) {
- if (Boolean.getBoolean(SystemPropertyKeys.MULTI_ZK_ENABLED)) {
+ if (Boolean.getBoolean(SystemPropertyKeys.MULTI_ZK_ENABLED) || zkAddr == null) {
+ LOG.warn(
+ "ZkBucketDataAccessor: either multi-zk enabled or zkAddr is null - "
+ + "starting ZkBucketDataAccessor in multi-zk mode!");
try {
// Create realm-aware ZkClient.
RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfig =
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
index 6dbf19c..59180eb 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
@@ -156,7 +156,7 @@ public class ClusterSetup {
@Deprecated
public ClusterSetup(String zkServerAddress) {
// If the multi ZK config is enabled, use FederatedZkClient on multi-realm mode
- if (Boolean.parseBoolean(System.getProperty(SystemPropertyKeys.MULTI_ZK_ENABLED))) {
+ if (Boolean.getBoolean(SystemPropertyKeys.MULTI_ZK_ENABLED) || zkServerAddress == null) {
try {
_zkClient = new FederatedZkClient(
new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder().build(),
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ZkHelixClusterVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ZkHelixClusterVerifier.java
index 11905bd..98bb835 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ZkHelixClusterVerifier.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ZkHelixClusterVerifier.java
@@ -111,7 +111,7 @@ public abstract class ZkHelixClusterVerifier
throw new IllegalArgumentException("ZkHelixClusterVerifier: clusterName is null or empty!");
}
// If the multi ZK config is enabled, use DedicatedZkClient on multi-realm mode
- if (Boolean.parseBoolean(System.getProperty(SystemPropertyKeys.MULTI_ZK_ENABLED))) {
+ if (Boolean.getBoolean(SystemPropertyKeys.MULTI_ZK_ENABLED) || zkAddr == null) {
try {
RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder connectionConfigBuilder =
new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder();
@@ -370,7 +370,7 @@ public abstract class ZkHelixClusterVerifier
protected RealmAwareZkClient createZkClient(RealmAwareZkClient.RealmMode realmMode,
RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfig,
RealmAwareZkClient.RealmAwareZkClientConfig clientConfig, String zkAddress) {
- if (Boolean.parseBoolean(System.getProperty(SystemPropertyKeys.MULTI_ZK_ENABLED))) {
+ if (Boolean.getBoolean(SystemPropertyKeys.MULTI_ZK_ENABLED) || zkAddress == null) {
try {
// First, try to create a RealmAwareZkClient that's a DedicatedZkClient
return DedicatedZkClientFactory.getInstance()
diff --git a/helix-core/src/test/java/org/apache/helix/integration/multizk/TestMultiZkHelixJavaApis.java b/helix-core/src/test/java/org/apache/helix/integration/multizk/TestMultiZkHelixJavaApis.java
index 95a118d..5146ce6 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/multizk/TestMultiZkHelixJavaApis.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/multizk/TestMultiZkHelixJavaApis.java
@@ -36,6 +36,9 @@ import org.apache.helix.BaseDataAccessor;
import org.apache.helix.ConfigAccessor;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.HelixManagerProperty;
import org.apache.helix.InstanceType;
import org.apache.helix.SystemPropertyKeys;
import org.apache.helix.TestHelper;
@@ -346,10 +349,64 @@ public class TestMultiZkHelixJavaApis {
}
/**
+ * Test creation of HelixManager and makes sure it connects correctly.
+ */
+ @Test(dependsOnMethods = "testCreateParticipants")
+ public void testZKHelixManager() throws Exception {
+ String clusterName = "CLUSTER_1";
+ String participantName = "HelixManager";
+ InstanceConfig instanceConfig = new InstanceConfig(participantName);
+ _zkHelixAdmin.addInstance(clusterName, instanceConfig);
+
+ RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder connectionConfigBuilder =
+ new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder();
+ // Try with a connection config without ZK realm sharding key set (should fail)
+ RealmAwareZkClient.RealmAwareZkConnectionConfig invalidZkConnectionConfig =
+ connectionConfigBuilder.build();
+ RealmAwareZkClient.RealmAwareZkConnectionConfig validZkConnectionConfig =
+ connectionConfigBuilder.setZkRealmShardingKey("/" + clusterName).build();
+ HelixManagerProperty.Builder propertyBuilder = new HelixManagerProperty.Builder();
+ try {
+ HelixManager invalidManager = HelixManagerFactory
+ .getZKHelixManager(clusterName, participantName, InstanceType.PARTICIPANT, null,
+ propertyBuilder.setRealmAWareZkConnectionConfig(invalidZkConnectionConfig).build());
+ Assert.fail("Should see a HelixException here because the connection config doesn't have the "
+ + "sharding key set!");
+ } catch (HelixException e) {
+ // Expected
+ }
+
+ // Connect as a participant
+ HelixManager managerParticipant = HelixManagerFactory
+ .getZKHelixManager(clusterName, participantName, InstanceType.PARTICIPANT, null,
+ propertyBuilder.setRealmAWareZkConnectionConfig(validZkConnectionConfig).build());
+ managerParticipant.connect();
+
+ // Connect as an administrator
+ HelixManager managerAdministrator = HelixManagerFactory
+ .getZKHelixManager(clusterName, participantName, InstanceType.ADMINISTRATOR, null,
+ propertyBuilder.setRealmAWareZkConnectionConfig(validZkConnectionConfig).build());
+ managerAdministrator.connect();
+
+ // Perform assert checks to make sure the manager can read and register itself as a participant
+ InstanceConfig instanceConfigRead = managerAdministrator.getClusterManagmentTool()
+ .getInstanceConfig(clusterName, participantName);
+ Assert.assertNotNull(instanceConfigRead);
+ Assert.assertEquals(instanceConfig.getInstanceName(), participantName);
+ Assert.assertNotNull(managerAdministrator.getHelixDataAccessor().getProperty(
+ managerAdministrator.getHelixDataAccessor().keyBuilder().liveInstance(participantName)));
+
+ // Clean up
+ managerParticipant.disconnect();
+ managerAdministrator.disconnect();
+ _zkHelixAdmin.dropInstance(clusterName, instanceConfig);
+ }
+
+ /**
* Test that clusters and instances are set up properly.
* Helix Java APIs tested in this method is ZkUtil.
*/
- @Test(dependsOnMethods = "testCreateParticipants")
+ @Test(dependsOnMethods = "testZKHelixManager")
public void testZkUtil() {
CLUSTER_LIST.forEach(cluster -> {
_zkHelixAdmin.getInstancesInCluster(cluster).forEach(instance -> ZKUtil
@@ -820,8 +877,7 @@ public class TestMultiZkHelixJavaApis {
new ClusterSetup.Builder().setRealmAwareZkConnectionConfig(connectionConfig).build();
try {
- verifyMsdsZkRealm(CLUSTER_ONE, true,
- () -> firstClusterSetup.addCluster(CLUSTER_ONE, false));
+ verifyMsdsZkRealm(CLUSTER_ONE, true, () -> firstClusterSetup.addCluster(CLUSTER_ONE, false));
verifyMsdsZkRealm(CLUSTER_FOUR, false,
() -> firstClusterSetup.addCluster(CLUSTER_FOUR, false));
@@ -856,8 +912,7 @@ public class TestMultiZkHelixJavaApis {
new ZKHelixAdmin.Builder().setRealmAwareZkConnectionConfig(connectionConfig).build();
try {
- verifyMsdsZkRealm(CLUSTER_ONE, true,
- () -> firstHelixAdmin.enableCluster(CLUSTER_ONE, true));
+ verifyMsdsZkRealm(CLUSTER_ONE, true, () -> firstHelixAdmin.enableCluster(CLUSTER_ONE, true));
verifyMsdsZkRealm(CLUSTER_FOUR, false,
() -> firstHelixAdmin.enableCluster(CLUSTER_FOUR, true));
@@ -880,8 +935,7 @@ public class TestMultiZkHelixJavaApis {
new ConfigAccessor.Builder().setRealmAwareZkConnectionConfig(connectionConfig).build();
try {
- verifyMsdsZkRealm(CLUSTER_ONE, true,
- () -> firstConfigAccessor.getClusterConfig(CLUSTER_ONE));
+ verifyMsdsZkRealm(CLUSTER_ONE, true, () -> firstConfigAccessor.getClusterConfig(CLUSTER_ONE));
verifyMsdsZkRealm(CLUSTER_FOUR, false,
() -> firstConfigAccessor.getClusterConfig(CLUSTER_FOUR));