You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by hu...@apache.org on 2020/04/08 22:54:09 UTC
[helix] 39/50: Make ZKHelixAdmin and ZKHelixManager Realm-aware
(#846)
This is an automated email from the ASF dual-hosted git repository.
hulee pushed a commit to branch zooscalability_merge
in repository https://gitbox.apache.org/repos/asf/helix.git
commit 3c300be5dcfa4bb2b27d3f9c298b9fd56a65b54e
Author: Huizhi Lu <ih...@gmail.com>
AuthorDate: Thu Mar 12 09:44:38 2020 -0700
Make ZKHelixAdmin and ZKHelixManager Realm-aware (#846)
To make Helix Java APIs realm-aware, we need to make both ZKHelixAdmin and ZKHelixManager realm-aware. This commit adds a Builder to set client config and connection config for building realm-aware ZkClients underneath.
---
.../java/org/apache/helix/SystemPropertyKeys.java | 8 +
.../apache/helix/manager/zk/CallbackHandler.java | 7 +-
.../helix/manager/zk/ParticipantManager.java | 7 +-
.../org/apache/helix/manager/zk/ZKHelixAdmin.java | 206 +++++++++++++++++----
.../apache/helix/manager/zk/ZKHelixManager.java | 102 +++++++---
.../test/java/org/apache/helix/ZkTestHelper.java | 21 ++-
.../integration/TestResourceGroupEndtoEnd.java | 3 +-
.../controller/TestControllerLeadershipChange.java | 4 +-
.../manager/ClusterControllerManager.java | 3 +-
.../manager/ClusterDistributedController.java | 3 +-
.../manager/MockParticipantManager.java | 3 +-
.../helix/integration/manager/ZkTestManager.java | 5 +-
.../apache/helix/manager/zk/TestHandleSession.java | 5 +-
.../impl/factory/HelixZkClientFactory.java | 2 +-
14 files changed, 292 insertions(+), 87 deletions(-)
diff --git a/helix-common/src/main/java/org/apache/helix/SystemPropertyKeys.java b/helix-common/src/main/java/org/apache/helix/SystemPropertyKeys.java
index a40dbe9..3b9d4a4 100644
--- a/helix-common/src/main/java/org/apache/helix/SystemPropertyKeys.java
+++ b/helix-common/src/main/java/org/apache/helix/SystemPropertyKeys.java
@@ -20,6 +20,14 @@ package org.apache.helix;
*/
public class SystemPropertyKeys {
+ // Used to compose default values in HelixManagerProperty
+ public static final String HELIX_MANAGER_PROPERTIES = "helix-manager.properties";
+
+ public static final String HELIX_MANAGER_VERSION = "clustermanager.version";
+
+ // Used to compose default values in HelixCloudProperty when cloud provider is Azure
+ public static final String AZURE_CLOUD_PROPERTIES = "azure-cloud.properties";
+
// Task Driver
public static final String TASK_CONFIG_LIMITATION = "helixTask.configsLimitation";
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
index 82c9b29..bbb8788 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
@@ -64,6 +64,7 @@ import org.apache.helix.model.Message;
import org.apache.helix.model.ResourceConfig;
import org.apache.helix.monitoring.mbeans.HelixCallbackMonitor;
import org.apache.helix.zookeeper.api.client.HelixZkClient;
+import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.zookeeper.zkclient.IZkChildListener;
import org.apache.helix.zookeeper.zkclient.IZkDataListener;
@@ -107,7 +108,7 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener {
private final Set<EventType> _eventTypes;
private final HelixDataAccessor _accessor;
private final ChangeType _changeType;
- private final HelixZkClient _zkClient;
+ private final RealmAwareZkClient _zkClient;
private final AtomicLong _lastNotificationTimeStamp;
private final HelixManager _manager;
private final PropertyKey _propertyKey;
@@ -191,12 +192,12 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener {
*/
private List<NotificationContext.Type> _expectTypes = nextNotificationType.get(Type.FINALIZE);
- public CallbackHandler(HelixManager manager, HelixZkClient client, PropertyKey propertyKey,
+ public CallbackHandler(HelixManager manager, RealmAwareZkClient client, PropertyKey propertyKey,
Object listener, EventType[] eventTypes, ChangeType changeType) {
this(manager, client, propertyKey, listener, eventTypes, changeType, null);
}
- public CallbackHandler(HelixManager manager, HelixZkClient client, PropertyKey propertyKey,
+ public CallbackHandler(HelixManager manager, RealmAwareZkClient client, PropertyKey propertyKey,
Object listener, EventType[] eventTypes, ChangeType changeType,
HelixCallbackMonitor monitor) {
if (listener == null) {
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
index 9eef47e..8bb54ff 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
@@ -54,6 +54,7 @@ import org.apache.helix.participant.StateMachineEngine;
import org.apache.helix.participant.statemachine.ScheduledTaskStateModelFactory;
import org.apache.helix.util.HelixUtil;
import org.apache.helix.zookeeper.api.client.HelixZkClient;
+import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.zookeeper.datamodel.ZNRecordBucketizer;
import org.apache.helix.zookeeper.zkclient.DataUpdater;
@@ -70,7 +71,7 @@ public class ParticipantManager {
private static Logger LOG = LoggerFactory.getLogger(ParticipantManager.class);
private static final String CLOUD_PROCESSOR_PATH_PREFIX = "org.apache.helix.cloud.";
- final HelixZkClient _zkclient;
+ final RealmAwareZkClient _zkclient;
final HelixManager _manager;
final PropertyKey.Builder _keyBuilder;
final String _clusterName;
@@ -91,14 +92,14 @@ public class ParticipantManager {
private final String _sessionId;
@Deprecated
- public ParticipantManager(HelixManager manager, HelixZkClient zkclient, int sessionTimeout,
+ public ParticipantManager(HelixManager manager, RealmAwareZkClient zkclient, int sessionTimeout,
LiveInstanceInfoProvider liveInstanceInfoProvider, List<PreConnectCallback> preConnectCallbacks,
final String sessionId) {
this(manager, zkclient, sessionTimeout, liveInstanceInfoProvider, preConnectCallbacks,
sessionId, null);
}
- public ParticipantManager(HelixManager manager, HelixZkClient zkclient, int sessionTimeout,
+ public ParticipantManager(HelixManager manager, RealmAwareZkClient zkclient, int sessionTimeout,
LiveInstanceInfoProvider liveInstanceInfoProvider,
List<PreConnectCallback> preConnectCallbacks, final String sessionId,
HelixManagerProperty helixManagerProperty) {
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
index 22a427b..7beaed8 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
@@ -47,9 +47,9 @@ import org.apache.helix.HelixDefinedState;
import org.apache.helix.HelixException;
import org.apache.helix.InstanceType;
import org.apache.helix.PropertyKey;
-import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.PropertyPathBuilder;
import org.apache.helix.PropertyType;
+import org.apache.helix.SystemPropertyKeys;
import org.apache.helix.controller.rebalancer.DelayedAutoRebalancer;
import org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
import org.apache.helix.controller.rebalancer.strategy.RebalanceStrategy;
@@ -75,12 +75,14 @@ import org.apache.helix.model.Message.MessageType;
import org.apache.helix.model.PauseSignal;
import org.apache.helix.model.ResourceConfig;
import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.msdcommon.exception.InvalidRoutingDataException;
import org.apache.helix.tools.DefaultIdealStateCalculator;
import org.apache.helix.util.HelixUtil;
import org.apache.helix.util.RebalanceUtil;
import org.apache.helix.zookeeper.api.client.HelixZkClient;
import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.helix.zookeeper.impl.client.FederatedZkClient;
import org.apache.helix.zookeeper.impl.factory.SharedZkClientFactory;
import org.apache.helix.zookeeper.zkclient.DataUpdater;
import org.apache.helix.zookeeper.zkclient.exception.ZkException;
@@ -91,18 +93,27 @@ import org.slf4j.LoggerFactory;
public class ZKHelixAdmin implements HelixAdmin {
+ private static final Logger LOG = LoggerFactory.getLogger(ZKHelixAdmin.class);
+
public static final String CONNECTION_TIMEOUT = "helixAdmin.timeOutInSec";
private static final String MAINTENANCE_ZNODE_ID = "maintenance";
private static final int DEFAULT_SUPERCLUSTER_REPLICA = 3;
private final RealmAwareZkClient _zkClient;
private final ConfigAccessor _configAccessor;
- // true if ZKHelixAdmin was instantiated with a HelixZkClient, false otherwise
+ // true if ZKHelixAdmin was instantiated with a RealmAwareZkClient, false otherwise
// This is used for close() to determine how ZKHelixAdmin should close the underlying ZkClient
private final boolean _usesExternalZkClient;
private static Logger logger = LoggerFactory.getLogger(ZKHelixAdmin.class);
+ /**
+ * @deprecated it is recommended to use the builder constructor {@link Builder}
+ * instead to avoid having to manually create and maintain a RealmAwareZkClient
+ * outside of ZKHelixAdmin.
+ *
+ * @param zkClient A created RealmAwareZkClient
+ */
@Deprecated
public ZKHelixAdmin(RealmAwareZkClient zkClient) {
_zkClient = zkClient;
@@ -110,14 +121,69 @@ public class ZKHelixAdmin implements HelixAdmin {
_usesExternalZkClient = true;
}
+ /**
+ * There are 2 realm-aware modes to connect to ZK:
+ * 1. if system property {@link SystemPropertyKeys#MULTI_ZK_ENABLED} is set to <code>"true"</code>,
+ * it will connect on multi-realm mode;
+ * 2. otherwise, it will connect on single-realm mode to the <code>zkAddress</code> provided.
+ *
+ * @param zkAddress ZK address
+ * @exception HelixException if not able to connect on multi-realm mode
+ *
+ * @deprecated it is recommended to use the builder constructor {@link Builder}
+ */
+ @Deprecated
public ZKHelixAdmin(String zkAddress) {
int timeOutInSec = Integer.parseInt(System.getProperty(CONNECTION_TIMEOUT, "30"));
- HelixZkClient.ZkClientConfig clientConfig = new HelixZkClient.ZkClientConfig();
- clientConfig.setZkSerializer(new ZNRecordSerializer())
- .setConnectInitTimeout(timeOutInSec * 1000);
- _zkClient = SharedZkClientFactory.getInstance()
- .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddress), clientConfig);
- _zkClient.waitUntilConnected(timeOutInSec, TimeUnit.SECONDS);
+ RealmAwareZkClient.RealmAwareZkClientConfig clientConfig =
+ new RealmAwareZkClient.RealmAwareZkClientConfig()
+ .setConnectInitTimeout(timeOutInSec * 1000L)
+ .setZkSerializer(new ZNRecordSerializer());
+
+ RealmAwareZkClient zkClient;
+
+ if (Boolean.getBoolean(SystemPropertyKeys.MULTI_ZK_ENABLED)) {
+ try {
+ zkClient = new FederatedZkClient(
+ new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder().build(), clientConfig);
+ } catch (IllegalStateException | IOException | InvalidRoutingDataException e) {
+ throw new HelixException("Not able to connect on multi-realm mode.", e);
+ }
+ } else {
+ zkClient = SharedZkClientFactory.getInstance()
+ .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddress),
+ clientConfig.createHelixZkClientConfig());
+ zkClient.waitUntilConnected(timeOutInSec, TimeUnit.SECONDS);
+ }
+
+ _zkClient = zkClient;
+ _configAccessor = new ConfigAccessor(_zkClient);
+ _usesExternalZkClient = false;
+ }
+
+ private ZKHelixAdmin(Builder builder) {
+ RealmAwareZkClient zkClient;
+ switch (builder.realmMode) {
+ case MULTI_REALM:
+ try {
+ zkClient = new FederatedZkClient(builder.realmAwareZkConnectionConfig,
+ builder.realmAwareZkClientConfig);
+ } catch (IOException | InvalidRoutingDataException | IllegalStateException e) {
+ throw new HelixException("Not able to connect on multi-realm mode.", e);
+ }
+ break;
+ case SINGLE_REALM:
+ // Create a HelixZkClient: Use a SharedZkClient because ZKHelixAdmin does not need to do
+ // ephemeral operations
+ zkClient = SharedZkClientFactory.getInstance()
+ .buildZkClient(new HelixZkClient.ZkConnectionConfig(builder.zkAddress),
+ builder.realmAwareZkClientConfig.createHelixZkClientConfig());
+ break;
+ default:
+ throw new HelixException("Invalid RealmMode given: " + builder.realmMode);
+ }
+
+ _zkClient = zkClient;
_configAccessor = new ConfigAccessor(_zkClient);
_usesExternalZkClient = false;
}
@@ -207,7 +273,7 @@ public class ZKHelixAdmin implements HelixAdmin {
HelixDataAccessor accessor =
new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
- Builder keyBuilder = accessor.keyBuilder();
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
return accessor.getProperty(keyBuilder.instanceConfig(instanceName));
}
@@ -383,7 +449,7 @@ public class ZKHelixAdmin implements HelixAdmin {
reason == null ? "NULL" : reason);
HelixDataAccessor accessor =
new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
- Builder keyBuilder = accessor.keyBuilder();
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
if (enabled) {
accessor.removeProperty(keyBuilder.pause());
@@ -408,7 +474,7 @@ public class ZKHelixAdmin implements HelixAdmin {
public boolean isInMaintenanceMode(String clusterName) {
HelixDataAccessor accessor =
new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
- Builder keyBuilder = accessor.keyBuilder();
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
return accessor.getBaseDataAccessor()
.exists(keyBuilder.maintenance().getPath(), AccessOption.PERSISTENT);
}
@@ -449,7 +515,7 @@ public class ZKHelixAdmin implements HelixAdmin {
final MaintenanceSignal.TriggeringEntity triggeringEntity) {
HelixDataAccessor accessor =
new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
- Builder keyBuilder = accessor.keyBuilder();
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
logger.info("Cluster {} {} {} maintenance mode for reason {}.", clusterName,
triggeringEntity == MaintenanceSignal.TriggeringEntity.CONTROLLER ? "automatically"
: "manually", enabled ? "enters" : "exits", reason == null ? "NULL" : reason);
@@ -517,7 +583,7 @@ public class ZKHelixAdmin implements HelixAdmin {
instanceName, clusterName);
HelixDataAccessor accessor =
new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
- Builder keyBuilder = accessor.keyBuilder();
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
// check the instance is alive
LiveInstance liveInstance = accessor.getProperty(keyBuilder.liveInstance(instanceName));
@@ -635,7 +701,7 @@ public class ZKHelixAdmin implements HelixAdmin {
instanceNames == null ? "NULL" : HelixUtil.serializeByComma(instanceNames), clusterName);
HelixDataAccessor accessor =
new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
- Builder keyBuilder = accessor.keyBuilder();
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
List<ExternalView> extViews = accessor.getChildValues(keyBuilder.externalViews());
Set<String> resetInstanceNames = new HashSet<String>(instanceNames);
@@ -663,7 +729,7 @@ public class ZKHelixAdmin implements HelixAdmin {
resourceNames == null ? "NULL" : HelixUtil.serializeByComma(resourceNames), clusterName);
HelixDataAccessor accessor =
new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
- Builder keyBuilder = accessor.keyBuilder();
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
List<ExternalView> extViews = accessor.getChildValues(keyBuilder.externalViews());
Set<String> resetResourceNames = new HashSet<String>(resourceNames);
@@ -791,7 +857,7 @@ public class ZKHelixAdmin implements HelixAdmin {
HelixDataAccessor accessor =
new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
- Builder keyBuilder = accessor.keyBuilder();
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
for (String instanceName : instances) {
InstanceConfig config = accessor.getProperty(keyBuilder.instanceConfig(instanceName));
@@ -910,7 +976,7 @@ public class ZKHelixAdmin implements HelixAdmin {
HelixDataAccessor accessor =
new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
- Builder keyBuilder = accessor.keyBuilder();
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
for (String resourceName : getResourcesInCluster(clusterName)) {
IdealState is = accessor.getProperty(keyBuilder.idealStates(resourceName));
@@ -926,7 +992,7 @@ public class ZKHelixAdmin implements HelixAdmin {
public IdealState getResourceIdealState(String clusterName, String resourceName) {
HelixDataAccessor accessor =
new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
- Builder keyBuilder = accessor.keyBuilder();
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
return accessor.getProperty(keyBuilder.idealStates(resourceName));
}
@@ -939,7 +1005,7 @@ public class ZKHelixAdmin implements HelixAdmin {
clusterName, idealState == null ? "NULL" : idealState.toString());
HelixDataAccessor accessor =
new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
- Builder keyBuilder = accessor.keyBuilder();
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
accessor.setProperty(keyBuilder.idealStates(resourceName), idealState);
}
@@ -982,7 +1048,7 @@ public class ZKHelixAdmin implements HelixAdmin {
public ExternalView getResourceExternalView(String clusterName, String resourceName) {
HelixDataAccessor accessor =
new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
- Builder keyBuilder = accessor.keyBuilder();
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
return accessor.getProperty(keyBuilder.externalView(resourceName));
}
@@ -1016,7 +1082,7 @@ public class ZKHelixAdmin implements HelixAdmin {
HelixDataAccessor accessor =
new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
- Builder keyBuilder = accessor.keyBuilder();
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
accessor.setProperty(keyBuilder.stateModelDef(stateModelDef), stateModel);
}
@@ -1025,7 +1091,7 @@ public class ZKHelixAdmin implements HelixAdmin {
logger.info("Drop resource {} from cluster {}", resourceName, clusterName);
HelixDataAccessor accessor =
new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
- Builder keyBuilder = accessor.keyBuilder();
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
accessor.removeProperty(keyBuilder.idealStates(resourceName));
accessor.removeProperty(keyBuilder.resourceConfig(resourceName));
@@ -1045,7 +1111,7 @@ public class ZKHelixAdmin implements HelixAdmin {
ZKHelixDataAccessor accessor =
new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
- Builder keyBuilder = accessor.keyBuilder();
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
accessor.setProperty(keyBuilder.cloudConfig(), cloudConfigBuilder);
}
@@ -1054,7 +1120,7 @@ public class ZKHelixAdmin implements HelixAdmin {
logger.info("Remove Cloud Config for cluster {}.", clusterName);
HelixDataAccessor accessor =
new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
- Builder keyBuilder = accessor.keyBuilder();
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
accessor.removeProperty(keyBuilder.cloudConfig());
}
@@ -1067,7 +1133,7 @@ public class ZKHelixAdmin implements HelixAdmin {
public StateModelDefinition getStateModelDef(String clusterName, String stateModelName) {
HelixDataAccessor accessor =
new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
- Builder keyBuilder = accessor.keyBuilder();
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
return accessor.getProperty(keyBuilder.stateModelDef(stateModelName));
}
@@ -1077,7 +1143,7 @@ public class ZKHelixAdmin implements HelixAdmin {
logger.info("Deleting cluster {}.", clusterName);
HelixDataAccessor accessor =
new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
- Builder keyBuilder = accessor.keyBuilder();
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
String root = "/" + clusterName;
if (accessor.getChildNames(keyBuilder.liveInstances()).size() > 0) {
@@ -1121,7 +1187,7 @@ public class ZKHelixAdmin implements HelixAdmin {
ZKHelixDataAccessor accessor =
new ZKHelixDataAccessor(grandCluster, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
- Builder keyBuilder = accessor.keyBuilder();
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
accessor.setProperty(keyBuilder.idealStates(idealState.getResourceName()), idealState);
}
@@ -1316,7 +1382,7 @@ public class ZKHelixAdmin implements HelixAdmin {
constraintId, clusterName);
BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient);
- Builder keyBuilder = new Builder(clusterName);
+ PropertyKey.Builder keyBuilder = new PropertyKey.Builder(clusterName);
String path = keyBuilder.constraint(constraintType.toString()).getPath();
baseAccessor.update(path, new DataUpdater<ZNRecord>() {
@@ -1339,7 +1405,7 @@ public class ZKHelixAdmin implements HelixAdmin {
constraintId, clusterName);
BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient);
- Builder keyBuilder = new Builder(clusterName);
+ PropertyKey.Builder keyBuilder = new PropertyKey.Builder(clusterName);
String path = keyBuilder.constraint(constraintType.toString()).getPath();
baseAccessor.update(path, new DataUpdater<ZNRecord>() {
@@ -1361,7 +1427,7 @@ public class ZKHelixAdmin implements HelixAdmin {
HelixDataAccessor accessor =
new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
- Builder keyBuilder = new Builder(clusterName);
+ PropertyKey.Builder keyBuilder = new PropertyKey.Builder(clusterName);
return accessor.getProperty(keyBuilder.constraint(constraintType.toString()));
}
@@ -1443,7 +1509,7 @@ public class ZKHelixAdmin implements HelixAdmin {
}
HelixDataAccessor accessor =
new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
- Builder keyBuilder = accessor.keyBuilder();
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
InstanceConfig config = accessor.getProperty(keyBuilder.instanceConfig(instanceName));
config.addTag(tag);
@@ -1464,7 +1530,7 @@ public class ZKHelixAdmin implements HelixAdmin {
}
ZKHelixDataAccessor accessor =
new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
- Builder keyBuilder = accessor.keyBuilder();
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
InstanceConfig config = accessor.getProperty(keyBuilder.instanceConfig(instanceName));
config.removeTag(tag);
@@ -1485,7 +1551,7 @@ public class ZKHelixAdmin implements HelixAdmin {
}
HelixDataAccessor accessor =
new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
- Builder keyBuilder = accessor.keyBuilder();
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
InstanceConfig config = accessor.getProperty(keyBuilder.instanceConfig(instanceName));
config.setZoneId(zoneId);
@@ -1677,7 +1743,7 @@ public class ZKHelixAdmin implements HelixAdmin {
HelixDataAccessor accessor =
new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<>(_zkClient));
- Builder keyBuilder = accessor.keyBuilder();
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
List<IdealState> idealStates = accessor.getChildValues(keyBuilder.idealStates());
List<String> nullIdealStates = new ArrayList<>();
for (int i = 0; i < idealStates.size(); i++) {
@@ -1718,7 +1784,7 @@ public class ZKHelixAdmin implements HelixAdmin {
// Ensure that all instances are valid
HelixDataAccessor accessor =
new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<>(_zkClient));
- Builder keyBuilder = accessor.keyBuilder();
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
List<String> instances = accessor.getChildNames(keyBuilder.instanceConfigs());
if (validateInstancesForWagedRebalance(clusterName, instances).containsValue(false)) {
throw new HelixException(String
@@ -1828,4 +1894,74 @@ public class ZKHelixAdmin implements HelixAdmin {
clusterConfig));
return true;
}
+
+ // TODO: refactor builder to reduce duplicate code with other Helix Java APIs
+ public static class Builder {
+ private String zkAddress;
+ private RealmAwareZkClient.RealmMode realmMode;
+ private RealmAwareZkClient.RealmAwareZkConnectionConfig realmAwareZkConnectionConfig;
+ private RealmAwareZkClient.RealmAwareZkClientConfig realmAwareZkClientConfig;
+
+ public Builder() {
+ }
+
+ public ZKHelixAdmin.Builder setZkAddress(String zkAddress) {
+ this.zkAddress = zkAddress;
+ return this;
+ }
+
+ public ZKHelixAdmin.Builder setRealmMode(RealmAwareZkClient.RealmMode realmMode) {
+ this.realmMode = realmMode;
+ return this;
+ }
+
+ public ZKHelixAdmin.Builder setRealmAwareZkConnectionConfig(
+ RealmAwareZkClient.RealmAwareZkConnectionConfig realmAwareZkConnectionConfig) {
+ realmAwareZkConnectionConfig = realmAwareZkConnectionConfig;
+ return this;
+ }
+
+ public ZKHelixAdmin.Builder setRealmAwareZkClientConfig(
+ RealmAwareZkClient.RealmAwareZkClientConfig realmAwareZkClientConfig) {
+ realmAwareZkClientConfig = realmAwareZkClientConfig;
+ return this;
+ }
+
+ public ZKHelixAdmin build() {
+ validate();
+ return new ZKHelixAdmin(this);
+ }
+
+ /*
+ * Validates the given parameters before creating an instance of ZKHelixAdmin.
+ */
+ private void validate() {
+ // Resolve RealmMode based on other parameters
+ boolean isZkAddressSet = zkAddress != null && !zkAddress.isEmpty();
+ if (realmMode == RealmAwareZkClient.RealmMode.SINGLE_REALM && !isZkAddressSet) {
+ throw new HelixException(
+ "RealmMode cannot be single-realm without a valid ZkAddress set!");
+ }
+ if (realmMode == RealmAwareZkClient.RealmMode.MULTI_REALM && isZkAddressSet) {
+ throw new HelixException(
+ "ZkAddress cannot be set on multi-realm mode!");
+ }
+ if (realmMode == null) {
+ realmMode = isZkAddressSet ? RealmAwareZkClient.RealmMode.SINGLE_REALM
+ : RealmAwareZkClient.RealmMode.MULTI_REALM;
+ }
+
+ // Resolve RealmAwareZkClientConfig
+ if (realmAwareZkClientConfig == null) {
+ realmAwareZkClientConfig = new RealmAwareZkClient.RealmAwareZkClientConfig();
+ }
+
+ // Resolve RealmAwareZkConnectionConfig
+ if (realmAwareZkConnectionConfig == null) {
+ // If not set, create a default one
+ realmAwareZkConnectionConfig =
+ new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder().build();
+ }
+ }
+ }
}
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
index 789e922..72ce07f 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
@@ -19,6 +19,7 @@ package org.apache.helix.manager.zk;
* under the License.
*/
+import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
@@ -75,14 +76,17 @@ import org.apache.helix.model.LiveInstance;
import org.apache.helix.monitoring.ZKPathDataDumpTask;
import org.apache.helix.monitoring.mbeans.HelixCallbackMonitor;
import org.apache.helix.monitoring.mbeans.MonitorLevel;
+import org.apache.helix.msdcommon.exception.InvalidRoutingDataException;
import org.apache.helix.participant.HelixStateMachineEngine;
import org.apache.helix.participant.StateMachineEngine;
import org.apache.helix.store.zk.AutoFallbackPropertyStore;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.util.HelixUtil;
import org.apache.helix.zookeeper.api.client.HelixZkClient;
+import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.zookeeper.impl.factory.DedicatedZkClientFactory;
+import org.apache.helix.zookeeper.impl.factory.HelixZkClientFactory;
import org.apache.helix.zookeeper.impl.factory.SharedZkClientFactory;
import org.apache.helix.zookeeper.zkclient.IZkStateListener;
import org.apache.helix.zookeeper.zkclient.exception.ZkInterruptedException;
@@ -120,7 +124,7 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
private final String _version;
private int _reportLatency;
- protected HelixZkClient _zkclient = null;
+ protected RealmAwareZkClient _zkclient;
private final DefaultMessagingService _messagingService;
private Map<ChangeType, HelixCallbackMonitor> _callbackMonitors;
@@ -663,30 +667,7 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
}
void createClient() throws Exception {
- PathBasedZkSerializer zkSerializer =
- ChainedPathZkSerializer.builder(new ZNRecordSerializer()).build();
-
- HelixZkClient.ZkConnectionConfig connectionConfig = new HelixZkClient.ZkConnectionConfig(_zkAddress);
- connectionConfig.setSessionTimeout(_sessionTimeout);
- HelixZkClient.ZkClientConfig clientConfig = new HelixZkClient.ZkClientConfig();
- clientConfig
- .setZkSerializer(zkSerializer)
- .setConnectInitTimeout(_connectionInitTimeout)
- .setMonitorType(_instanceType.name())
- .setMonitorKey(_clusterName)
- .setMonitorInstanceName(_instanceName)
- .setMonitorRootPathOnly(isMonitorRootPathOnly());
-
- HelixZkClient newClient;
- switch (_instanceType) {
- case ADMINISTRATOR:
- newClient = SharedZkClientFactory.getInstance().buildZkClient(connectionConfig, clientConfig);
- break;
- default:
- newClient = DedicatedZkClientFactory
- .getInstance().buildZkClient(connectionConfig, clientConfig);
- break;
- }
+ final RealmAwareZkClient newClient = createSingleRealmZkClient();
synchronized (this) {
if (_zkclient != null) {
@@ -1296,4 +1277,75 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
public Long getSessionStartTime() {
return _sessionStartTime;
}
+
+ /*
+ * Prepares connection config and client config based on the internal parameters given to
+ * HelixManager in order to create a ZkClient instance to use. Note that a shared ZkClient
+ * instance will be created if connecting as an ADMINISTRATOR to minimize the cost of creating
+ * ZkConnections.
+ */
+ private RealmAwareZkClient createSingleRealmZkClient() {
+ final String shardingKey = buildShardingKey();
+ PathBasedZkSerializer zkSerializer =
+ ChainedPathZkSerializer.builder(new ZNRecordSerializer()).build();
+
+ RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfig =
+ new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder()
+ .setRealmMode(RealmAwareZkClient.RealmMode.SINGLE_REALM)
+ .setZkRealmShardingKey(shardingKey)
+ .setSessionTimeout(_sessionTimeout).build();
+
+ RealmAwareZkClient.RealmAwareZkClientConfig clientConfig =
+ new RealmAwareZkClient.RealmAwareZkClientConfig();
+
+ clientConfig.setZkSerializer(zkSerializer)
+ .setConnectInitTimeout(_connectionInitTimeout)
+ .setMonitorType(_instanceType.name())
+ .setMonitorKey(_clusterName)
+ .setMonitorInstanceName(_instanceName)
+ .setMonitorRootPathOnly(isMonitorRootPathOnly());
+
+ if (_instanceType == InstanceType.ADMINISTRATOR) {
+ return resolveZkClient(SharedZkClientFactory.getInstance(), connectionConfig,
+ clientConfig);
+ }
+
+ return resolveZkClient(DedicatedZkClientFactory.getInstance(), connectionConfig,
+ clientConfig);
+ }
+
+ /*
+ * Resolves what type of ZkClient this HelixManager should use based on whether MULTI_ZK_ENABLED
+ * System config is set or not. Two types of ZkClients are available:
+ * 1) If MULTI_ZK_ENABLED is set to true, we create a dedicated RealmAwareZkClient
+ * that provides full ZkClient functionalities and connects to the correct ZK by querying
+ * MetadataStoreDirectoryService.
+ * 2) Otherwise, we create a dedicated HelixZkClient which plainly connects to
+ * the ZK address given.
+ */
+ private RealmAwareZkClient resolveZkClient(HelixZkClientFactory zkClientFactory,
+ RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfig,
+ RealmAwareZkClient.RealmAwareZkClientConfig clientConfig) {
+ if (Boolean.getBoolean(SystemPropertyKeys.MULTI_ZK_ENABLED)) {
+ try {
+ // Create realm-aware ZkClient.
+ return zkClientFactory.buildZkClient(connectionConfig, clientConfig);
+ } catch (IllegalArgumentException | IOException | InvalidRoutingDataException e) {
+ throw new HelixException("Not able to connect on realm-aware mode for sharding key: "
+ + connectionConfig.getZkRealmShardingKey(), e);
+ }
+ }
+
+ // If multi-zk mode is not enabled, create HelixZkClient with the provided zk address.
+ HelixZkClient.ZkClientConfig helixZkClientConfig = clientConfig.createHelixZkClientConfig();
+ HelixZkClient.ZkConnectionConfig helixZkConnectionConfig =
+ new HelixZkClient.ZkConnectionConfig(_zkAddress)
+ .setSessionTimeout(connectionConfig.getSessionTimeout());
+
+ return zkClientFactory.buildZkClient(helixZkConnectionConfig, helixZkClientConfig);
+ }
+
+ private String buildShardingKey() {
+ return _clusterName.charAt(0) == '/' ? _clusterName : "/" + _clusterName;
+ }
}
diff --git a/helix-core/src/test/java/org/apache/helix/ZkTestHelper.java b/helix-core/src/test/java/org/apache/helix/ZkTestHelper.java
index c2b3d35..73dded4 100644
--- a/helix-core/src/test/java/org/apache/helix/ZkTestHelper.java
+++ b/helix-core/src/test/java/org/apache/helix/ZkTestHelper.java
@@ -43,6 +43,7 @@ import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
import org.apache.helix.zookeeper.api.client.HelixZkClient;
import org.apache.helix.model.ExternalView;
+import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
import org.apache.helix.zookeeper.zkclient.IZkChildListener;
import org.apache.helix.zookeeper.zkclient.IZkDataListener;
import org.apache.helix.zookeeper.zkclient.IZkStateListener;
@@ -71,7 +72,7 @@ public class ZkTestHelper {
/**
* Simulate a zk state change by calling {@link ZkClient#process(WatchedEvent)} directly
*/
- public static void simulateZkStateReconnected(HelixZkClient client) {
+ public static void simulateZkStateReconnected(RealmAwareZkClient client) {
ZkClient zkClient = (ZkClient) client;
WatchedEvent event = new WatchedEvent(EventType.None, KeeperState.Disconnected, null);
zkClient.process(event);
@@ -84,7 +85,7 @@ public class ZkTestHelper {
* @param client
* @return
*/
- public static String getSessionId(HelixZkClient client) {
+ public static String getSessionId(RealmAwareZkClient client) {
ZkConnection connection = (ZkConnection) ((ZkClient) client).getConnection();
ZooKeeper curZookeeper = connection.getZookeeper();
return Long.toHexString(curZookeeper.getSessionId());
@@ -146,7 +147,7 @@ public class ZkTestHelper {
LOG.info("After expiry. sessionId: " + Long.toHexString(curZookeeper.getSessionId()));
}
- public static void expireSession(HelixZkClient client) throws Exception {
+ public static void expireSession(RealmAwareZkClient client) throws Exception {
final CountDownLatch waitNewSession = new CountDownLatch(1);
final ZkClient zkClient = (ZkClient) client;
@@ -213,7 +214,7 @@ public class ZkTestHelper {
* @param client
* @throws Exception
*/
- public static void asyncExpireSession(HelixZkClient client) throws Exception {
+ public static void asyncExpireSession(RealmAwareZkClient client) throws Exception {
final ZkClient zkClient = (ZkClient) client;
ZkConnection connection = ((ZkConnection) zkClient.getConnection());
ZooKeeper curZookeeper = connection.getZookeeper();
@@ -245,7 +246,7 @@ public class ZkTestHelper {
/*
* stateMap: partition->instance->state
*/
- public static boolean verifyState(HelixZkClient zkclient, String clusterName, String resourceName,
+ public static boolean verifyState(RealmAwareZkClient zkclient, String clusterName, String resourceName,
Map<String, Map<String, String>> expectStateMap, String op) {
boolean result = true;
ZkBaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(zkclient);
@@ -391,7 +392,7 @@ public class ZkTestHelper {
}
}
- public static Map<String, List<String>> getZkWatch(HelixZkClient client) throws Exception {
+ public static Map<String, List<String>> getZkWatch(RealmAwareZkClient client) throws Exception {
Map<String, List<String>> lists = new HashMap<String, List<String>>();
ZkClient zkClient = (ZkClient) client;
@@ -424,7 +425,7 @@ public class ZkTestHelper {
return lists;
}
- public static Map<String, Set<IZkDataListener>> getZkDataListener(HelixZkClient client)
+ public static Map<String, Set<IZkDataListener>> getZkDataListener(RealmAwareZkClient client)
throws Exception {
java.lang.reflect.Field field = getField(client.getClass(), "_dataListener");
field.setAccessible(true);
@@ -433,7 +434,7 @@ public class ZkTestHelper {
return dataListener;
}
- public static Map<String, Set<IZkChildListener>> getZkChildListener(HelixZkClient client)
+ public static Map<String, Set<IZkChildListener>> getZkChildListener(RealmAwareZkClient client)
throws Exception {
java.lang.reflect.Field field = getField(client.getClass(), "_childListener");
field.setAccessible(true);
@@ -442,7 +443,7 @@ public class ZkTestHelper {
return childListener;
}
- public static boolean tryWaitZkEventsCleaned(HelixZkClient zkclient) throws Exception {
+ public static boolean tryWaitZkEventsCleaned(RealmAwareZkClient zkclient) throws Exception {
java.lang.reflect.Field field = getField(zkclient.getClass(), "_eventThread");
field.setAccessible(true);
Object eventThread = field.get(zkclient);
@@ -468,7 +469,7 @@ public class ZkTestHelper {
return false;
}
- public static void injectExpire(HelixZkClient client)
+ public static void injectExpire(RealmAwareZkClient client)
throws ExecutionException, InterruptedException {
final ZkClient zkClient = (ZkClient) client;
Future future = _executor.submit(new Runnable() {
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestResourceGroupEndtoEnd.java b/helix-core/src/test/java/org/apache/helix/integration/TestResourceGroupEndtoEnd.java
index 0aa5787..0313a77 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestResourceGroupEndtoEnd.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestResourceGroupEndtoEnd.java
@@ -47,6 +47,7 @@ import org.apache.helix.participant.statemachine.StateModel;
import org.apache.helix.participant.statemachine.StateModelFactory;
import org.apache.helix.spectator.RoutingTableProvider;
import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
@@ -427,7 +428,7 @@ public class TestResourceGroupEndtoEnd extends ZkTestBase {
}
@Override
- public HelixZkClient getZkClient() {
+ public RealmAwareZkClient getZkClient() {
return _zkclient;
}
diff --git a/helix-core/src/test/java/org/apache/helix/integration/controller/TestControllerLeadershipChange.java b/helix-core/src/test/java/org/apache/helix/integration/controller/TestControllerLeadershipChange.java
index 8a1ff03..5aa9a91 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/controller/TestControllerLeadershipChange.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/controller/TestControllerLeadershipChange.java
@@ -34,12 +34,12 @@ import org.apache.helix.common.ZkTestBase;
import org.apache.helix.integration.manager.ClusterControllerManager;
import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.manager.zk.CallbackHandler;
-import org.apache.helix.zookeeper.api.client.HelixZkClient;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.monitoring.mbeans.MonitorDomainNames;
import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
+import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
@@ -148,7 +148,7 @@ public class TestControllerLeadershipChange extends ZkTestBase {
callbackHandlers.forEach(callbackHandler -> Assert.assertTrue(callbackHandler.isReady()));
// check the zk connection is open
- HelixZkClient zkClient = controller.getZkClient();
+ RealmAwareZkClient zkClient = controller.getZkClient();
Assert.assertFalse(zkClient.isClosed());
Long sessionId = zkClient.getSessionId();
Assert.assertNotNull(sessionId);
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterControllerManager.java b/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterControllerManager.java
index 7d810fb..9281e2d 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterControllerManager.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterControllerManager.java
@@ -27,6 +27,7 @@ import org.apache.helix.InstanceType;
import org.apache.helix.manager.zk.CallbackHandler;
import org.apache.helix.manager.zk.ZKHelixManager;
import org.apache.helix.zookeeper.api.client.HelixZkClient;
+import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -94,7 +95,7 @@ public class ClusterControllerManager extends ZKHelixManager implements Runnable
}
@Override
- public HelixZkClient getZkClient() {
+ public RealmAwareZkClient getZkClient() {
return _zkclient;
}
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterDistributedController.java b/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterDistributedController.java
index 8e15928..397fae5 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterDistributedController.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterDistributedController.java
@@ -28,6 +28,7 @@ import org.apache.helix.manager.zk.ZKHelixManager;
import org.apache.helix.zookeeper.api.client.HelixZkClient;
import org.apache.helix.participant.DistClusterControllerStateModelFactory;
import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -83,7 +84,7 @@ public class ClusterDistributedController extends ZKHelixManager implements Runn
}
@Override
- public HelixZkClient getZkClient() {
+ public RealmAwareZkClient getZkClient() {
return _zkclient;
}
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java b/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java
index 4f1b966..4a44502 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java
@@ -34,6 +34,7 @@ import org.apache.helix.mock.participant.MockSchemataModelFactory;
import org.apache.helix.mock.participant.MockTransition;
import org.apache.helix.model.BuiltInStateModelDefinitions;
import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -136,7 +137,7 @@ public class MockParticipantManager extends ZKHelixManager implements Runnable,
}
@Override
- public HelixZkClient getZkClient() {
+ public RealmAwareZkClient getZkClient() {
return _zkclient;
}
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/ZkTestManager.java b/helix-core/src/test/java/org/apache/helix/integration/manager/ZkTestManager.java
index 1a6903a..93e4b53 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/manager/ZkTestManager.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/manager/ZkTestManager.java
@@ -22,10 +22,11 @@ package org.apache.helix.integration.manager;
import java.util.List;
import org.apache.helix.manager.zk.CallbackHandler;
-import org.apache.helix.zookeeper.api.client.HelixZkClient;
+import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
+
public interface ZkTestManager {
- HelixZkClient getZkClient();
+ RealmAwareZkClient getZkClient();
List<CallbackHandler> getHandlers();
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestHandleSession.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestHandleSession.java
index 92eb3c4..54d38ee 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestHandleSession.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestHandleSession.java
@@ -41,6 +41,7 @@ import org.apache.helix.controller.GenericHelixController;
import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.zookeeper.api.client.HelixZkClient;
+import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
import org.apache.helix.zookeeper.impl.client.ZkClient;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -570,7 +571,7 @@ public class TestHandleSession extends ZkTestBase {
return _handlers;
}
- HelixZkClient getZkClient() {
+ RealmAwareZkClient getZkClient() {
return _zkclient;
}
@@ -616,7 +617,7 @@ public class TestHandleSession extends ZkTestBase {
return _handlers;
}
- HelixZkClient getZkClient() {
+ RealmAwareZkClient getZkClient() {
return _zkclient;
}
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/factory/HelixZkClientFactory.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/factory/HelixZkClientFactory.java
index ca13321..1f792ee 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/factory/HelixZkClientFactory.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/factory/HelixZkClientFactory.java
@@ -29,7 +29,7 @@ import org.apache.helix.zookeeper.zkclient.ZkConnection;
/**
* Abstract class of the ZkClient factory.
*/
-abstract class HelixZkClientFactory implements RealmAwareZkClientFactory {
+public abstract class HelixZkClientFactory implements RealmAwareZkClientFactory {
/**
* Build a ZkClient using specified connection config and client config