You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by hu...@apache.org on 2020/03/12 16:44:46 UTC
[helix] branch zooscalability updated: Make ZKHelixAdmin and
ZKHelixManager Realm-aware (#846)
This is an automated email from the ASF dual-hosted git repository.
hulee pushed a commit to branch zooscalability
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/zooscalability by this push:
new 74b220d Make ZKHelixAdmin and ZKHelixManager Realm-aware (#846)
74b220d is described below
commit 74b220d467f0a3c976f299d616fe143e3ca662d1
Author: Huizhi Lu <ih...@gmail.com>
AuthorDate: Thu Mar 12 09:44:38 2020 -0700
Make ZKHelixAdmin and ZKHelixManager Realm-aware (#846)
To make Helix Java APIs realm-aware, we need to make both ZKHelixAdmin and ZKHelixManager realm-aware. This commit adds a Builder to set client config and connection config for building realm-aware ZkClients underneath.
---
.../apache/helix/manager/zk/CallbackHandler.java | 7 +-
.../helix/manager/zk/ParticipantManager.java | 5 +-
.../org/apache/helix/manager/zk/ZKHelixAdmin.java | 202 +++++++++++++++++----
.../apache/helix/manager/zk/ZKHelixManager.java | 102 ++++++++---
.../test/java/org/apache/helix/ZkTestHelper.java | 21 ++-
.../integration/TestResourceGroupEndtoEnd.java | 3 +-
.../controller/TestControllerLeadershipChange.java | 4 +-
.../manager/ClusterControllerManager.java | 3 +-
.../manager/ClusterDistributedController.java | 3 +-
.../manager/MockParticipantManager.java | 3 +-
.../helix/integration/manager/ZkTestManager.java | 5 +-
.../helix/manager/zk/TestHandleNewSession.java | 5 +-
.../impl/factory/HelixZkClientFactory.java | 2 +-
13 files changed, 281 insertions(+), 84 deletions(-)
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
index 82c9b29..bbb8788 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
@@ -64,6 +64,7 @@ import org.apache.helix.model.Message;
import org.apache.helix.model.ResourceConfig;
import org.apache.helix.monitoring.mbeans.HelixCallbackMonitor;
import org.apache.helix.zookeeper.api.client.HelixZkClient;
+import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.zookeeper.zkclient.IZkChildListener;
import org.apache.helix.zookeeper.zkclient.IZkDataListener;
@@ -107,7 +108,7 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener {
private final Set<EventType> _eventTypes;
private final HelixDataAccessor _accessor;
private final ChangeType _changeType;
- private final HelixZkClient _zkClient;
+ private final RealmAwareZkClient _zkClient;
private final AtomicLong _lastNotificationTimeStamp;
private final HelixManager _manager;
private final PropertyKey _propertyKey;
@@ -191,12 +192,12 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener {
*/
private List<NotificationContext.Type> _expectTypes = nextNotificationType.get(Type.FINALIZE);
- public CallbackHandler(HelixManager manager, HelixZkClient client, PropertyKey propertyKey,
+ public CallbackHandler(HelixManager manager, RealmAwareZkClient client, PropertyKey propertyKey,
Object listener, EventType[] eventTypes, ChangeType changeType) {
this(manager, client, propertyKey, listener, eventTypes, changeType, null);
}
- public CallbackHandler(HelixManager manager, HelixZkClient client, PropertyKey propertyKey,
+ public CallbackHandler(HelixManager manager, RealmAwareZkClient client, PropertyKey propertyKey,
Object listener, EventType[] eventTypes, ChangeType changeType,
HelixCallbackMonitor monitor) {
if (listener == null) {
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
index 411d937..5090a86 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
@@ -47,6 +47,7 @@ import org.apache.helix.model.builder.HelixConfigScopeBuilder;
import org.apache.helix.participant.StateMachineEngine;
import org.apache.helix.participant.statemachine.ScheduledTaskStateModelFactory;
import org.apache.helix.zookeeper.api.client.HelixZkClient;
+import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.zookeeper.datamodel.ZNRecordBucketizer;
import org.apache.helix.zookeeper.zkclient.DataUpdater;
@@ -62,7 +63,7 @@ import org.slf4j.LoggerFactory;
public class ParticipantManager {
private static Logger LOG = LoggerFactory.getLogger(ParticipantManager.class);
- final HelixZkClient _zkclient;
+ final RealmAwareZkClient _zkclient;
final HelixManager _manager;
final PropertyKey.Builder _keyBuilder;
final String _clusterName;
@@ -81,7 +82,7 @@ public class ParticipantManager {
// session race condition when handling new session for the participant.
private final String _sessionId;
- public ParticipantManager(HelixManager manager, HelixZkClient zkclient, int sessionTimeout,
+ public ParticipantManager(HelixManager manager, RealmAwareZkClient zkclient, int sessionTimeout,
LiveInstanceInfoProvider liveInstanceInfoProvider, List<PreConnectCallback> preConnectCallbacks,
final String sessionId) {
_zkclient = zkclient;
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
index d7c40ff..ea9b55a 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
@@ -47,9 +47,9 @@ import org.apache.helix.HelixDefinedState;
import org.apache.helix.HelixException;
import org.apache.helix.InstanceType;
import org.apache.helix.PropertyKey;
-import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.PropertyPathBuilder;
import org.apache.helix.PropertyType;
+import org.apache.helix.SystemPropertyKeys;
import org.apache.helix.controller.rebalancer.DelayedAutoRebalancer;
import org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
import org.apache.helix.controller.rebalancer.strategy.RebalanceStrategy;
@@ -74,12 +74,14 @@ import org.apache.helix.model.Message.MessageType;
import org.apache.helix.model.PauseSignal;
import org.apache.helix.model.ResourceConfig;
import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.msdcommon.exception.InvalidRoutingDataException;
import org.apache.helix.tools.DefaultIdealStateCalculator;
import org.apache.helix.util.HelixUtil;
import org.apache.helix.util.RebalanceUtil;
import org.apache.helix.zookeeper.api.client.HelixZkClient;
import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.helix.zookeeper.impl.client.FederatedZkClient;
import org.apache.helix.zookeeper.impl.factory.SharedZkClientFactory;
import org.apache.helix.zookeeper.zkclient.DataUpdater;
import org.apache.helix.zookeeper.zkclient.exception.ZkException;
@@ -90,18 +92,27 @@ import org.slf4j.LoggerFactory;
public class ZKHelixAdmin implements HelixAdmin {
+ private static final Logger LOG = LoggerFactory.getLogger(ZKHelixAdmin.class);
+
public static final String CONNECTION_TIMEOUT = "helixAdmin.timeOutInSec";
private static final String MAINTENANCE_ZNODE_ID = "maintenance";
private static final int DEFAULT_SUPERCLUSTER_REPLICA = 3;
private final RealmAwareZkClient _zkClient;
private final ConfigAccessor _configAccessor;
- // true if ZKHelixAdmin was instantiated with a HelixZkClient, false otherwise
+ // true if ZKHelixAdmin was instantiated with a RealmAwareZkClient, false otherwise
// This is used for close() to determine how ZKHelixAdmin should close the underlying ZkClient
private final boolean _usesExternalZkClient;
private static Logger logger = LoggerFactory.getLogger(ZKHelixAdmin.class);
+ /**
+ * @deprecated it is recommended to use the builder constructor {@link Builder}
+ * instead to avoid having to manually create and maintain a RealmAwareZkClient
+ * outside of ZKHelixAdmin.
+ *
+ * @param zkClient A created RealmAwareZkClient
+ */
@Deprecated
public ZKHelixAdmin(RealmAwareZkClient zkClient) {
_zkClient = zkClient;
@@ -109,14 +120,69 @@ public class ZKHelixAdmin implements HelixAdmin {
_usesExternalZkClient = true;
}
+ /**
+ * There are 2 realm-aware modes to connect to ZK:
+ * 1. if system property {@link SystemPropertyKeys#MULTI_ZK_ENABLED} is set to <code>"true"</code>,
+ * it will connect on multi-realm mode;
+ * 2. otherwise, it will connect on single-realm mode to the <code>zkAddress</code> provided.
+ *
+ * @param zkAddress ZK address
+ * @exception HelixException if not able to connect on multi-realm mode
+ *
+ * @deprecated it is recommended to use the builder constructor {@link Builder}
+ */
+ @Deprecated
public ZKHelixAdmin(String zkAddress) {
int timeOutInSec = Integer.parseInt(System.getProperty(CONNECTION_TIMEOUT, "30"));
- HelixZkClient.ZkClientConfig clientConfig = new HelixZkClient.ZkClientConfig();
- clientConfig.setZkSerializer(new ZNRecordSerializer())
- .setConnectInitTimeout(timeOutInSec * 1000);
- _zkClient = SharedZkClientFactory.getInstance()
- .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddress), clientConfig);
- _zkClient.waitUntilConnected(timeOutInSec, TimeUnit.SECONDS);
+ RealmAwareZkClient.RealmAwareZkClientConfig clientConfig =
+ new RealmAwareZkClient.RealmAwareZkClientConfig()
+ .setConnectInitTimeout(timeOutInSec * 1000L)
+ .setZkSerializer(new ZNRecordSerializer());
+
+ RealmAwareZkClient zkClient;
+
+ if (Boolean.getBoolean(SystemPropertyKeys.MULTI_ZK_ENABLED)) {
+ try {
+ zkClient = new FederatedZkClient(
+ new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder().build(), clientConfig);
+ } catch (IllegalStateException | IOException | InvalidRoutingDataException e) {
+ throw new HelixException("Not able to connect on multi-realm mode.", e);
+ }
+ } else {
+ zkClient = SharedZkClientFactory.getInstance()
+ .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddress),
+ clientConfig.createHelixZkClientConfig());
+ zkClient.waitUntilConnected(timeOutInSec, TimeUnit.SECONDS);
+ }
+
+ _zkClient = zkClient;
+ _configAccessor = new ConfigAccessor(_zkClient);
+ _usesExternalZkClient = false;
+ }
+
+ private ZKHelixAdmin(Builder builder) {
+ RealmAwareZkClient zkClient;
+ switch (builder.realmMode) {
+ case MULTI_REALM:
+ try {
+ zkClient = new FederatedZkClient(builder.realmAwareZkConnectionConfig,
+ builder.realmAwareZkClientConfig);
+ } catch (IOException | InvalidRoutingDataException | IllegalStateException e) {
+ throw new HelixException("Not able to connect on multi-realm mode.", e);
+ }
+ break;
+ case SINGLE_REALM:
+ // Create a HelixZkClient: Use a SharedZkClient because ZKHelixAdmin does not need to do
+ // ephemeral operations
+ zkClient = SharedZkClientFactory.getInstance()
+ .buildZkClient(new HelixZkClient.ZkConnectionConfig(builder.zkAddress),
+ builder.realmAwareZkClientConfig.createHelixZkClientConfig());
+ break;
+ default:
+ throw new HelixException("Invalid RealmMode given: " + builder.realmMode);
+ }
+
+ _zkClient = zkClient;
_configAccessor = new ConfigAccessor(_zkClient);
_usesExternalZkClient = false;
}
@@ -206,7 +272,7 @@ public class ZKHelixAdmin implements HelixAdmin {
HelixDataAccessor accessor =
new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
- Builder keyBuilder = accessor.keyBuilder();
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
return accessor.getProperty(keyBuilder.instanceConfig(instanceName));
}
@@ -382,7 +448,7 @@ public class ZKHelixAdmin implements HelixAdmin {
reason == null ? "NULL" : reason);
HelixDataAccessor accessor =
new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
- Builder keyBuilder = accessor.keyBuilder();
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
if (enabled) {
accessor.removeProperty(keyBuilder.pause());
@@ -407,7 +473,7 @@ public class ZKHelixAdmin implements HelixAdmin {
public boolean isInMaintenanceMode(String clusterName) {
HelixDataAccessor accessor =
new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
- Builder keyBuilder = accessor.keyBuilder();
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
return accessor.getBaseDataAccessor()
.exists(keyBuilder.maintenance().getPath(), AccessOption.PERSISTENT);
}
@@ -448,7 +514,7 @@ public class ZKHelixAdmin implements HelixAdmin {
final MaintenanceSignal.TriggeringEntity triggeringEntity) {
HelixDataAccessor accessor =
new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
- Builder keyBuilder = accessor.keyBuilder();
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
logger.info("Cluster {} {} {} maintenance mode for reason {}.", clusterName,
triggeringEntity == MaintenanceSignal.TriggeringEntity.CONTROLLER ? "automatically"
: "manually", enabled ? "enters" : "exits", reason == null ? "NULL" : reason);
@@ -516,7 +582,7 @@ public class ZKHelixAdmin implements HelixAdmin {
instanceName, clusterName);
HelixDataAccessor accessor =
new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
- Builder keyBuilder = accessor.keyBuilder();
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
// check the instance is alive
LiveInstance liveInstance = accessor.getProperty(keyBuilder.liveInstance(instanceName));
@@ -634,7 +700,7 @@ public class ZKHelixAdmin implements HelixAdmin {
instanceNames == null ? "NULL" : HelixUtil.serializeByComma(instanceNames), clusterName);
HelixDataAccessor accessor =
new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
- Builder keyBuilder = accessor.keyBuilder();
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
List<ExternalView> extViews = accessor.getChildValues(keyBuilder.externalViews());
Set<String> resetInstanceNames = new HashSet<String>(instanceNames);
@@ -662,7 +728,7 @@ public class ZKHelixAdmin implements HelixAdmin {
resourceNames == null ? "NULL" : HelixUtil.serializeByComma(resourceNames), clusterName);
HelixDataAccessor accessor =
new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
- Builder keyBuilder = accessor.keyBuilder();
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
List<ExternalView> extViews = accessor.getChildValues(keyBuilder.externalViews());
Set<String> resetResourceNames = new HashSet<String>(resourceNames);
@@ -790,7 +856,7 @@ public class ZKHelixAdmin implements HelixAdmin {
HelixDataAccessor accessor =
new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
- Builder keyBuilder = accessor.keyBuilder();
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
for (String instanceName : instances) {
InstanceConfig config = accessor.getProperty(keyBuilder.instanceConfig(instanceName));
@@ -909,7 +975,7 @@ public class ZKHelixAdmin implements HelixAdmin {
HelixDataAccessor accessor =
new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
- Builder keyBuilder = accessor.keyBuilder();
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
for (String resourceName : getResourcesInCluster(clusterName)) {
IdealState is = accessor.getProperty(keyBuilder.idealStates(resourceName));
@@ -925,7 +991,7 @@ public class ZKHelixAdmin implements HelixAdmin {
public IdealState getResourceIdealState(String clusterName, String resourceName) {
HelixDataAccessor accessor =
new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
- Builder keyBuilder = accessor.keyBuilder();
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
return accessor.getProperty(keyBuilder.idealStates(resourceName));
}
@@ -938,7 +1004,7 @@ public class ZKHelixAdmin implements HelixAdmin {
clusterName, idealState == null ? "NULL" : idealState.toString());
HelixDataAccessor accessor =
new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
- Builder keyBuilder = accessor.keyBuilder();
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
accessor.setProperty(keyBuilder.idealStates(resourceName), idealState);
}
@@ -981,7 +1047,7 @@ public class ZKHelixAdmin implements HelixAdmin {
public ExternalView getResourceExternalView(String clusterName, String resourceName) {
HelixDataAccessor accessor =
new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
- Builder keyBuilder = accessor.keyBuilder();
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
return accessor.getProperty(keyBuilder.externalView(resourceName));
}
@@ -1015,7 +1081,7 @@ public class ZKHelixAdmin implements HelixAdmin {
HelixDataAccessor accessor =
new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
- Builder keyBuilder = accessor.keyBuilder();
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
accessor.setProperty(keyBuilder.stateModelDef(stateModelDef), stateModel);
}
@@ -1024,7 +1090,7 @@ public class ZKHelixAdmin implements HelixAdmin {
logger.info("Drop resource {} from cluster {}", resourceName, clusterName);
HelixDataAccessor accessor =
new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
- Builder keyBuilder = accessor.keyBuilder();
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
accessor.removeProperty(keyBuilder.idealStates(resourceName));
accessor.removeProperty(keyBuilder.resourceConfig(resourceName));
@@ -1039,7 +1105,7 @@ public class ZKHelixAdmin implements HelixAdmin {
public StateModelDefinition getStateModelDef(String clusterName, String stateModelName) {
HelixDataAccessor accessor =
new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
- Builder keyBuilder = accessor.keyBuilder();
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
return accessor.getProperty(keyBuilder.stateModelDef(stateModelName));
}
@@ -1049,7 +1115,7 @@ public class ZKHelixAdmin implements HelixAdmin {
logger.info("Deleting cluster {}.", clusterName);
HelixDataAccessor accessor =
new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
- Builder keyBuilder = accessor.keyBuilder();
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
String root = "/" + clusterName;
if (accessor.getChildNames(keyBuilder.liveInstances()).size() > 0) {
@@ -1093,7 +1159,7 @@ public class ZKHelixAdmin implements HelixAdmin {
ZKHelixDataAccessor accessor =
new ZKHelixDataAccessor(grandCluster, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
- Builder keyBuilder = accessor.keyBuilder();
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
accessor.setProperty(keyBuilder.idealStates(idealState.getResourceName()), idealState);
}
@@ -1288,7 +1354,7 @@ public class ZKHelixAdmin implements HelixAdmin {
constraintId, clusterName);
BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient);
- Builder keyBuilder = new Builder(clusterName);
+ PropertyKey.Builder keyBuilder = new PropertyKey.Builder(clusterName);
String path = keyBuilder.constraint(constraintType.toString()).getPath();
baseAccessor.update(path, new DataUpdater<ZNRecord>() {
@@ -1311,7 +1377,7 @@ public class ZKHelixAdmin implements HelixAdmin {
constraintId, clusterName);
BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient);
- Builder keyBuilder = new Builder(clusterName);
+ PropertyKey.Builder keyBuilder = new PropertyKey.Builder(clusterName);
String path = keyBuilder.constraint(constraintType.toString()).getPath();
baseAccessor.update(path, new DataUpdater<ZNRecord>() {
@@ -1333,7 +1399,7 @@ public class ZKHelixAdmin implements HelixAdmin {
HelixDataAccessor accessor =
new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
- Builder keyBuilder = new Builder(clusterName);
+ PropertyKey.Builder keyBuilder = new PropertyKey.Builder(clusterName);
return accessor.getProperty(keyBuilder.constraint(constraintType.toString()));
}
@@ -1415,7 +1481,7 @@ public class ZKHelixAdmin implements HelixAdmin {
}
HelixDataAccessor accessor =
new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
- Builder keyBuilder = accessor.keyBuilder();
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
InstanceConfig config = accessor.getProperty(keyBuilder.instanceConfig(instanceName));
config.addTag(tag);
@@ -1436,7 +1502,7 @@ public class ZKHelixAdmin implements HelixAdmin {
}
ZKHelixDataAccessor accessor =
new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
- Builder keyBuilder = accessor.keyBuilder();
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
InstanceConfig config = accessor.getProperty(keyBuilder.instanceConfig(instanceName));
config.removeTag(tag);
@@ -1457,7 +1523,7 @@ public class ZKHelixAdmin implements HelixAdmin {
}
HelixDataAccessor accessor =
new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
- Builder keyBuilder = accessor.keyBuilder();
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
InstanceConfig config = accessor.getProperty(keyBuilder.instanceConfig(instanceName));
config.setZoneId(zoneId);
@@ -1649,7 +1715,7 @@ public class ZKHelixAdmin implements HelixAdmin {
HelixDataAccessor accessor =
new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<>(_zkClient));
- Builder keyBuilder = accessor.keyBuilder();
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
List<IdealState> idealStates = accessor.getChildValues(keyBuilder.idealStates());
List<String> nullIdealStates = new ArrayList<>();
for (int i = 0; i < idealStates.size(); i++) {
@@ -1690,7 +1756,7 @@ public class ZKHelixAdmin implements HelixAdmin {
// Ensure that all instances are valid
HelixDataAccessor accessor =
new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<>(_zkClient));
- Builder keyBuilder = accessor.keyBuilder();
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
List<String> instances = accessor.getChildNames(keyBuilder.instanceConfigs());
if (validateInstancesForWagedRebalance(clusterName, instances).containsValue(false)) {
throw new HelixException(String
@@ -1800,4 +1866,74 @@ public class ZKHelixAdmin implements HelixAdmin {
clusterConfig));
return true;
}
+
+ // TODO: refactor builder to reduce duplicate code with other Helix Java APIs
+ public static class Builder {
+ private String zkAddress;
+ private RealmAwareZkClient.RealmMode realmMode;
+ private RealmAwareZkClient.RealmAwareZkConnectionConfig realmAwareZkConnectionConfig;
+ private RealmAwareZkClient.RealmAwareZkClientConfig realmAwareZkClientConfig;
+
+ public Builder() {
+ }
+
+ public ZKHelixAdmin.Builder setZkAddress(String zkAddress) {
+ this.zkAddress = zkAddress;
+ return this;
+ }
+
+ public ZKHelixAdmin.Builder setRealmMode(RealmAwareZkClient.RealmMode realmMode) {
+ this.realmMode = realmMode;
+ return this;
+ }
+
+ public ZKHelixAdmin.Builder setRealmAwareZkConnectionConfig(
+ RealmAwareZkClient.RealmAwareZkConnectionConfig realmAwareZkConnectionConfig) {
+ realmAwareZkConnectionConfig = realmAwareZkConnectionConfig;
+ return this;
+ }
+
+ public ZKHelixAdmin.Builder setRealmAwareZkClientConfig(
+ RealmAwareZkClient.RealmAwareZkClientConfig realmAwareZkClientConfig) {
+ realmAwareZkClientConfig = realmAwareZkClientConfig;
+ return this;
+ }
+
+ public ZKHelixAdmin build() {
+ validate();
+ return new ZKHelixAdmin(this);
+ }
+
+ /*
+ * Validates the given parameters before creating an instance of ZKHelixAdmin.
+ */
+ private void validate() {
+ // Resolve RealmMode based on other parameters
+ boolean isZkAddressSet = zkAddress != null && !zkAddress.isEmpty();
+ if (realmMode == RealmAwareZkClient.RealmMode.SINGLE_REALM && !isZkAddressSet) {
+ throw new HelixException(
+ "RealmMode cannot be single-realm without a valid ZkAddress set!");
+ }
+ if (realmMode == RealmAwareZkClient.RealmMode.MULTI_REALM && isZkAddressSet) {
+ throw new HelixException(
+ "ZkAddress cannot be set on multi-realm mode!");
+ }
+ if (realmMode == null) {
+ realmMode = isZkAddressSet ? RealmAwareZkClient.RealmMode.SINGLE_REALM
+ : RealmAwareZkClient.RealmMode.MULTI_REALM;
+ }
+
+ // Resolve RealmAwareZkClientConfig
+ if (realmAwareZkClientConfig == null) {
+ realmAwareZkClientConfig = new RealmAwareZkClient.RealmAwareZkClientConfig();
+ }
+
+ // Resolve RealmAwareZkConnectionConfig
+ if (realmAwareZkConnectionConfig == null) {
+ // If not set, create a default one
+ realmAwareZkConnectionConfig =
+ new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder().build();
+ }
+ }
+ }
}
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
index b452b4c..f7180b2 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
@@ -19,6 +19,7 @@ package org.apache.helix.manager.zk;
* under the License.
*/
+import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
@@ -73,14 +74,17 @@ import org.apache.helix.model.LiveInstance;
import org.apache.helix.monitoring.ZKPathDataDumpTask;
import org.apache.helix.monitoring.mbeans.HelixCallbackMonitor;
import org.apache.helix.monitoring.mbeans.MonitorLevel;
+import org.apache.helix.msdcommon.exception.InvalidRoutingDataException;
import org.apache.helix.participant.HelixStateMachineEngine;
import org.apache.helix.participant.StateMachineEngine;
import org.apache.helix.store.zk.AutoFallbackPropertyStore;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.util.HelixUtil;
import org.apache.helix.zookeeper.api.client.HelixZkClient;
+import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.zookeeper.impl.factory.DedicatedZkClientFactory;
+import org.apache.helix.zookeeper.impl.factory.HelixZkClientFactory;
import org.apache.helix.zookeeper.impl.factory.SharedZkClientFactory;
import org.apache.helix.zookeeper.zkclient.IZkStateListener;
import org.apache.helix.zookeeper.zkclient.exception.ZkInterruptedException;
@@ -117,7 +121,7 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
private final String _version;
private int _reportLatency;
- protected HelixZkClient _zkclient = null;
+ protected RealmAwareZkClient _zkclient;
private final DefaultMessagingService _messagingService;
private Map<ChangeType, HelixCallbackMonitor> _callbackMonitors;
@@ -652,30 +656,7 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
}
void createClient() throws Exception {
- PathBasedZkSerializer zkSerializer =
- ChainedPathZkSerializer.builder(new ZNRecordSerializer()).build();
-
- HelixZkClient.ZkConnectionConfig connectionConfig = new HelixZkClient.ZkConnectionConfig(_zkAddress);
- connectionConfig.setSessionTimeout(_sessionTimeout);
- HelixZkClient.ZkClientConfig clientConfig = new HelixZkClient.ZkClientConfig();
- clientConfig
- .setZkSerializer(zkSerializer)
- .setConnectInitTimeout(_connectionInitTimeout)
- .setMonitorType(_instanceType.name())
- .setMonitorKey(_clusterName)
- .setMonitorInstanceName(_instanceName)
- .setMonitorRootPathOnly(isMonitorRootPathOnly());
-
- HelixZkClient newClient;
- switch (_instanceType) {
- case ADMINISTRATOR:
- newClient = SharedZkClientFactory.getInstance().buildZkClient(connectionConfig, clientConfig);
- break;
- default:
- newClient = DedicatedZkClientFactory
- .getInstance().buildZkClient(connectionConfig, clientConfig);
- break;
- }
+ final RealmAwareZkClient newClient = createSingleRealmZkClient();
synchronized (this) {
if (_zkclient != null) {
@@ -1285,4 +1266,75 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
public Long getSessionStartTime() {
return _sessionStartTime;
}
+
+ /*
+ * Prepares connection config and client config based on the internal parameters given to
+ * HelixManager in order to create a ZkClient instance to use. Note that a shared ZkClient
+ * instance will be created if connecting as an ADMINISTRATOR to minimize the cost of creating
+ * ZkConnections.
+ */
+ private RealmAwareZkClient createSingleRealmZkClient() {
+ final String shardingKey = buildShardingKey();
+ PathBasedZkSerializer zkSerializer =
+ ChainedPathZkSerializer.builder(new ZNRecordSerializer()).build();
+
+ RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfig =
+ new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder()
+ .setRealmMode(RealmAwareZkClient.RealmMode.SINGLE_REALM)
+ .setZkRealmShardingKey(shardingKey)
+ .setSessionTimeout(_sessionTimeout).build();
+
+ RealmAwareZkClient.RealmAwareZkClientConfig clientConfig =
+ new RealmAwareZkClient.RealmAwareZkClientConfig();
+
+ clientConfig.setZkSerializer(zkSerializer)
+ .setConnectInitTimeout(_connectionInitTimeout)
+ .setMonitorType(_instanceType.name())
+ .setMonitorKey(_clusterName)
+ .setMonitorInstanceName(_instanceName)
+ .setMonitorRootPathOnly(isMonitorRootPathOnly());
+
+ if (_instanceType == InstanceType.ADMINISTRATOR) {
+ return resolveZkClient(SharedZkClientFactory.getInstance(), connectionConfig,
+ clientConfig);
+ }
+
+ return resolveZkClient(DedicatedZkClientFactory.getInstance(), connectionConfig,
+ clientConfig);
+ }
+
+ /*
+ * Resolves what type of ZkClient this HelixManager should use based on whether MULTI_ZK_ENABLED
+ * System config is set or not. Two types of ZkClients are available:
+ * 1) If MULTI_ZK_ENABLED is set to true, we create a dedicated RealmAwareZkClient
+ * that provides full ZkClient functionalities and connects to the correct ZK by querying
+ * MetadataStoreDirectoryService.
+ * 2) Otherwise, we create a dedicated HelixZkClient which plainly connects to
+ * the ZK address given.
+ */
+ private RealmAwareZkClient resolveZkClient(HelixZkClientFactory zkClientFactory,
+ RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfig,
+ RealmAwareZkClient.RealmAwareZkClientConfig clientConfig) {
+ if (Boolean.getBoolean(SystemPropertyKeys.MULTI_ZK_ENABLED)) {
+ try {
+ // Create realm-aware ZkClient.
+ return zkClientFactory.buildZkClient(connectionConfig, clientConfig);
+ } catch (IllegalArgumentException | IOException | InvalidRoutingDataException e) {
+ throw new HelixException("Not able to connect on realm-aware mode for sharding key: "
+ + connectionConfig.getZkRealmShardingKey(), e);
+ }
+ }
+
+ // If multi-zk mode is not enabled, create HelixZkClient with the provided zk address.
+ HelixZkClient.ZkClientConfig helixZkClientConfig = clientConfig.createHelixZkClientConfig();
+ HelixZkClient.ZkConnectionConfig helixZkConnectionConfig =
+ new HelixZkClient.ZkConnectionConfig(_zkAddress)
+ .setSessionTimeout(connectionConfig.getSessionTimeout());
+
+ return zkClientFactory.buildZkClient(helixZkConnectionConfig, helixZkClientConfig);
+ }
+
+ private String buildShardingKey() {
+ return _clusterName.charAt(0) == '/' ? _clusterName : "/" + _clusterName;
+ }
}
diff --git a/helix-core/src/test/java/org/apache/helix/ZkTestHelper.java b/helix-core/src/test/java/org/apache/helix/ZkTestHelper.java
index c2b3d35..73dded4 100644
--- a/helix-core/src/test/java/org/apache/helix/ZkTestHelper.java
+++ b/helix-core/src/test/java/org/apache/helix/ZkTestHelper.java
@@ -43,6 +43,7 @@ import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
import org.apache.helix.zookeeper.api.client.HelixZkClient;
import org.apache.helix.model.ExternalView;
+import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
import org.apache.helix.zookeeper.zkclient.IZkChildListener;
import org.apache.helix.zookeeper.zkclient.IZkDataListener;
import org.apache.helix.zookeeper.zkclient.IZkStateListener;
@@ -71,7 +72,7 @@ public class ZkTestHelper {
/**
* Simulate a zk state change by calling {@link ZkClient#process(WatchedEvent)} directly
*/
- public static void simulateZkStateReconnected(HelixZkClient client) {
+ public static void simulateZkStateReconnected(RealmAwareZkClient client) {
ZkClient zkClient = (ZkClient) client;
WatchedEvent event = new WatchedEvent(EventType.None, KeeperState.Disconnected, null);
zkClient.process(event);
@@ -84,7 +85,7 @@ public class ZkTestHelper {
* @param client
* @return
*/
- public static String getSessionId(HelixZkClient client) {
+ public static String getSessionId(RealmAwareZkClient client) {
ZkConnection connection = (ZkConnection) ((ZkClient) client).getConnection();
ZooKeeper curZookeeper = connection.getZookeeper();
return Long.toHexString(curZookeeper.getSessionId());
@@ -146,7 +147,7 @@ public class ZkTestHelper {
LOG.info("After expiry. sessionId: " + Long.toHexString(curZookeeper.getSessionId()));
}
- public static void expireSession(HelixZkClient client) throws Exception {
+ public static void expireSession(RealmAwareZkClient client) throws Exception {
final CountDownLatch waitNewSession = new CountDownLatch(1);
final ZkClient zkClient = (ZkClient) client;
@@ -213,7 +214,7 @@ public class ZkTestHelper {
* @param client
* @throws Exception
*/
- public static void asyncExpireSession(HelixZkClient client) throws Exception {
+ public static void asyncExpireSession(RealmAwareZkClient client) throws Exception {
final ZkClient zkClient = (ZkClient) client;
ZkConnection connection = ((ZkConnection) zkClient.getConnection());
ZooKeeper curZookeeper = connection.getZookeeper();
@@ -245,7 +246,7 @@ public class ZkTestHelper {
/*
* stateMap: partition->instance->state
*/
- public static boolean verifyState(HelixZkClient zkclient, String clusterName, String resourceName,
+ public static boolean verifyState(RealmAwareZkClient zkclient, String clusterName, String resourceName,
Map<String, Map<String, String>> expectStateMap, String op) {
boolean result = true;
ZkBaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(zkclient);
@@ -391,7 +392,7 @@ public class ZkTestHelper {
}
}
- public static Map<String, List<String>> getZkWatch(HelixZkClient client) throws Exception {
+ public static Map<String, List<String>> getZkWatch(RealmAwareZkClient client) throws Exception {
Map<String, List<String>> lists = new HashMap<String, List<String>>();
ZkClient zkClient = (ZkClient) client;
@@ -424,7 +425,7 @@ public class ZkTestHelper {
return lists;
}
- public static Map<String, Set<IZkDataListener>> getZkDataListener(HelixZkClient client)
+ public static Map<String, Set<IZkDataListener>> getZkDataListener(RealmAwareZkClient client)
throws Exception {
java.lang.reflect.Field field = getField(client.getClass(), "_dataListener");
field.setAccessible(true);
@@ -433,7 +434,7 @@ public class ZkTestHelper {
return dataListener;
}
- public static Map<String, Set<IZkChildListener>> getZkChildListener(HelixZkClient client)
+ public static Map<String, Set<IZkChildListener>> getZkChildListener(RealmAwareZkClient client)
throws Exception {
java.lang.reflect.Field field = getField(client.getClass(), "_childListener");
field.setAccessible(true);
@@ -442,7 +443,7 @@ public class ZkTestHelper {
return childListener;
}
- public static boolean tryWaitZkEventsCleaned(HelixZkClient zkclient) throws Exception {
+ public static boolean tryWaitZkEventsCleaned(RealmAwareZkClient zkclient) throws Exception {
java.lang.reflect.Field field = getField(zkclient.getClass(), "_eventThread");
field.setAccessible(true);
Object eventThread = field.get(zkclient);
@@ -468,7 +469,7 @@ public class ZkTestHelper {
return false;
}
- public static void injectExpire(HelixZkClient client)
+ public static void injectExpire(RealmAwareZkClient client)
throws ExecutionException, InterruptedException {
final ZkClient zkClient = (ZkClient) client;
Future future = _executor.submit(new Runnable() {
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestResourceGroupEndtoEnd.java b/helix-core/src/test/java/org/apache/helix/integration/TestResourceGroupEndtoEnd.java
index 0aa5787..0313a77 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestResourceGroupEndtoEnd.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestResourceGroupEndtoEnd.java
@@ -47,6 +47,7 @@ import org.apache.helix.participant.statemachine.StateModel;
import org.apache.helix.participant.statemachine.StateModelFactory;
import org.apache.helix.spectator.RoutingTableProvider;
import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
@@ -427,7 +428,7 @@ public class TestResourceGroupEndtoEnd extends ZkTestBase {
}
@Override
- public HelixZkClient getZkClient() {
+ public RealmAwareZkClient getZkClient() {
return _zkclient;
}
diff --git a/helix-core/src/test/java/org/apache/helix/integration/controller/TestControllerLeadershipChange.java b/helix-core/src/test/java/org/apache/helix/integration/controller/TestControllerLeadershipChange.java
index 8a1ff03..5aa9a91 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/controller/TestControllerLeadershipChange.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/controller/TestControllerLeadershipChange.java
@@ -34,12 +34,12 @@ import org.apache.helix.common.ZkTestBase;
import org.apache.helix.integration.manager.ClusterControllerManager;
import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.manager.zk.CallbackHandler;
-import org.apache.helix.zookeeper.api.client.HelixZkClient;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.monitoring.mbeans.MonitorDomainNames;
import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
+import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
@@ -148,7 +148,7 @@ public class TestControllerLeadershipChange extends ZkTestBase {
callbackHandlers.forEach(callbackHandler -> Assert.assertTrue(callbackHandler.isReady()));
// check the zk connection is open
- HelixZkClient zkClient = controller.getZkClient();
+ RealmAwareZkClient zkClient = controller.getZkClient();
Assert.assertFalse(zkClient.isClosed());
Long sessionId = zkClient.getSessionId();
Assert.assertNotNull(sessionId);
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterControllerManager.java b/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterControllerManager.java
index 7d810fb..9281e2d 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterControllerManager.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterControllerManager.java
@@ -27,6 +27,7 @@ import org.apache.helix.InstanceType;
import org.apache.helix.manager.zk.CallbackHandler;
import org.apache.helix.manager.zk.ZKHelixManager;
import org.apache.helix.zookeeper.api.client.HelixZkClient;
+import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -94,7 +95,7 @@ public class ClusterControllerManager extends ZKHelixManager implements Runnable
}
@Override
- public HelixZkClient getZkClient() {
+ public RealmAwareZkClient getZkClient() {
return _zkclient;
}
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterDistributedController.java b/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterDistributedController.java
index 8e15928..397fae5 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterDistributedController.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterDistributedController.java
@@ -28,6 +28,7 @@ import org.apache.helix.manager.zk.ZKHelixManager;
import org.apache.helix.zookeeper.api.client.HelixZkClient;
import org.apache.helix.participant.DistClusterControllerStateModelFactory;
import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -83,7 +84,7 @@ public class ClusterDistributedController extends ZKHelixManager implements Runn
}
@Override
- public HelixZkClient getZkClient() {
+ public RealmAwareZkClient getZkClient() {
return _zkclient;
}
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java b/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java
index 0036e0d..84bc334 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java
@@ -33,6 +33,7 @@ import org.apache.helix.mock.participant.MockSchemataModelFactory;
import org.apache.helix.mock.participant.MockTransition;
import org.apache.helix.model.BuiltInStateModelDefinitions;
import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -128,7 +129,7 @@ public class MockParticipantManager extends ZKHelixManager implements Runnable,
}
@Override
- public HelixZkClient getZkClient() {
+ public RealmAwareZkClient getZkClient() {
return _zkclient;
}
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/ZkTestManager.java b/helix-core/src/test/java/org/apache/helix/integration/manager/ZkTestManager.java
index 1a6903a..93e4b53 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/manager/ZkTestManager.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/manager/ZkTestManager.java
@@ -22,10 +22,11 @@ package org.apache.helix.integration.manager;
import java.util.List;
import org.apache.helix.manager.zk.CallbackHandler;
-import org.apache.helix.zookeeper.api.client.HelixZkClient;
+import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
+
public interface ZkTestManager {
- HelixZkClient getZkClient();
+ RealmAwareZkClient getZkClient();
List<CallbackHandler> getHandlers();
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestHandleNewSession.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestHandleNewSession.java
index 00bedcf..e4c6695 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestHandleNewSession.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestHandleNewSession.java
@@ -35,6 +35,7 @@ import org.apache.helix.common.ZkTestBase;
import org.apache.helix.controller.GenericHelixController;
import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.zookeeper.api.client.HelixZkClient;
+import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
import org.apache.helix.zookeeper.impl.client.ZkClient;
import org.apache.helix.model.LiveInstance;
import org.testng.Assert;
@@ -492,7 +493,7 @@ public class TestHandleNewSession extends ZkTestBase {
return _handlers;
}
- HelixZkClient getZkClient() {
+ RealmAwareZkClient getZkClient() {
return _zkclient;
}
@@ -538,7 +539,7 @@ public class TestHandleNewSession extends ZkTestBase {
return _handlers;
}
- HelixZkClient getZkClient() {
+ RealmAwareZkClient getZkClient() {
return _zkclient;
}
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/factory/HelixZkClientFactory.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/factory/HelixZkClientFactory.java
index ca13321..1f792ee 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/factory/HelixZkClientFactory.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/factory/HelixZkClientFactory.java
@@ -29,7 +29,7 @@ import org.apache.helix.zookeeper.zkclient.ZkConnection;
/**
* Abstract class of the ZkClient factory.
*/
-abstract class HelixZkClientFactory implements RealmAwareZkClientFactory {
+public abstract class HelixZkClientFactory implements RealmAwareZkClientFactory {
/**
* Build a ZkClient using specified connection config and client config