You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2018/10/30 01:16:03 UTC
[4/4] helix git commit: Using HelixZkClient to replace ZkClient in
helix-core and helix-rest.
Using HelixZkClient to replace ZkClient in helix-core and helix-rest.
1. Replace as much usage as possible. For the raw ZkClient tests, the usages are kept.
2. For backward compatibility, some public interfaces still returns ZkClient. Marks them as Deprecated.
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/9d7364d7
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/9d7364d7
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/9d7364d7
Branch: refs/heads/master
Commit: 9d7364d7abba3932a1b25e96e4eb9dd3e203cec9
Parents: 01076ca
Author: Jiajun Wang <jj...@linkedin.com>
Authored: Wed Sep 26 11:39:42 2018 -0700
Committer: Junkai Xue <jx...@linkedin.com>
Committed: Mon Oct 29 18:15:22 2018 -0700
----------------------------------------------------------------------
.../controller/HierarchicalDataHolder.java | 10 +-
.../examples/IdealStateBuilderExample.java | 12 +-
.../helix/examples/IdealStateExample.java | 14 +-
.../helix/manager/zk/CallbackHandler.java | 10 +-
.../helix/manager/zk/ParticipantManager.java | 2 +-
.../apache/helix/manager/zk/ZKHelixManager.java | 12 +-
.../helix/manager/zk/zookeeper/ZkClient.java | 4 +
.../java/org/apache/helix/task/TaskDriver.java | 8 +-
.../tools/ClusterExternalViewVerifier.java | 4 +-
.../helix/tools/ClusterLiveNodesVerifier.java | 6 +-
.../org/apache/helix/tools/ClusterSetup.java | 13 +-
.../helix/tools/ClusterStateVerifier.java | 33 ++-
.../org/apache/helix/tools/ClusterVerifier.java | 6 +-
.../BestPossibleExternalViewVerifier.java | 13 +-
.../ClusterLiveNodesVerifier.java | 6 +-
.../StrictMatchExternalViewVerifier.java | 14 +-
.../ZkHelixClusterVerifier.java | 19 +-
.../org/apache/helix/tools/TestExecutor.java | 13 +-
.../tools/commandtools/IntegrationTestUtil.java | 19 +-
.../org/apache/helix/util/ZKClientPool.java | 2 +-
.../test/java/org/apache/helix/TestHelper.java | 33 ++-
.../apache/helix/TestHierarchicalDataStore.java | 24 +-
.../java/org/apache/helix/TestZKCallback.java | 2 -
.../test/java/org/apache/helix/TestZkBasis.java | 88 ++++++-
.../java/org/apache/helix/TestZnodeModify.java | 21 +-
.../java/org/apache/helix/ZkTestHelper.java | 68 +++--
.../org/apache/helix/common/ZkTestBase.java | 54 ++--
.../TestCorrectnessOnConnectivityLoss.java | 18 +-
.../apache/helix/integration/TestDriver.java | 84 +++---
.../integration/TestEnableCompression.java | 15 +-
.../integration/TestEntropyFreeNodeBounce.java | 2 +-
.../helix/integration/TestPauseSignal.java | 6 +-
.../integration/TestResourceGroupEndtoEnd.java | 6 +-
.../helix/integration/TestStatusUpdate.java | 7 +-
.../integration/TestZkCallbackHandlerLeak.java | 4 +-
.../helix/integration/TestZkConnectionLost.java | 16 +-
.../manager/ClusterControllerManager.java | 6 +-
.../manager/ClusterDistributedController.java | 6 +-
.../manager/MockParticipantManager.java | 6 +-
.../integration/manager/ZkTestManager.java | 4 +-
.../rebalancer/TestAutoRebalance.java | 7 +-
.../TestAutoRebalancePartitionLimit.java | 8 +-
.../TestCustomizedIdealStateRebalancer.java | 8 +-
.../rebalancer/TestFullAutoNodeTagging.java | 15 +-
.../helix/manager/zk/TestZNRecordSizeLimit.java | 256 ++++++++++---------
.../zk/TestZkCacheAsyncOpSingleThread.java | 5 +-
.../zk/TestZkCacheSyncOpSingleThread.java | 8 +-
.../apache/helix/manager/zk/TestZkFlapping.java | 82 +-----
.../helix/manager/zk/TestZkReconnect.java | 23 +-
.../org/apache/helix/mock/MockZkClient.java | 3 +-
.../helix/mock/controller/MockController.java | 10 +-
.../helix/participant/MockZKHelixManager.java | 4 +-
.../store/zk/TestZkHelixPropertyStore.java | 4 +-
.../apache/helix/tools/TestClusterSetup.java | 39 ++-
.../apache/helix/rest/server/ServerContext.java | 27 +-
.../rest/server/resources/AbstractResource.java | 9 -
.../resources/helix/AbstractHelixResource.java | 10 +-
.../server/resources/helix/ClusterAccessor.java | 4 +-
.../resources/helix/ResourceAccessor.java | 8 +-
.../helix/rest/server/AbstractTestClass.java | 21 +-
60 files changed, 663 insertions(+), 578 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/9d7364d7/helix-core/src/main/java/org/apache/helix/controller/HierarchicalDataHolder.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/HierarchicalDataHolder.java b/helix-core/src/main/java/org/apache/helix/controller/HierarchicalDataHolder.java
index 3f3d999..27ecc40 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/HierarchicalDataHolder.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/HierarchicalDataHolder.java
@@ -26,7 +26,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
-import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.manager.zk.client.HelixZkClient;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -42,16 +42,16 @@ public class HierarchicalDataHolder<T> {
* currentVersion, gets updated when data is read from original source
*/
AtomicLong currentVersion;
- private final ZkClient _zkClient;
+ private final HelixZkClient _zkClient;
private final String _rootPath;
private final FileFilter _filter;
- public HierarchicalDataHolder(ZkClient client, String rootPath, FileFilter filter) {
+ public HierarchicalDataHolder(HelixZkClient client, String rootPath, FileFilter filter) {
this._zkClient = client;
this._rootPath = rootPath;
this._filter = filter;
// Node<T> initialValue = new Node<T>();
- root = new AtomicReference<HierarchicalDataHolder.Node<T>>();
+ root = new AtomicReference<>();
currentVersion = new AtomicLong(1);
refreshData();
}
@@ -99,7 +99,7 @@ public class HierarchicalDataHolder<T> {
Node<T> oldChild =
(oldRoot != null && oldRoot.children != null) ? oldRoot.children.get(child) : null;
if (newRoot.children == null) {
- newRoot.children = new ConcurrentHashMap<String, HierarchicalDataHolder.Node<T>>();
+ newRoot.children = new ConcurrentHashMap<>();
}
if (!newRoot.children.contains(child)) {
newRoot.children.put(child, new Node<T>());
http://git-wip-us.apache.org/repos/asf/helix/blob/9d7364d7/helix-core/src/main/java/org/apache/helix/examples/IdealStateBuilderExample.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/examples/IdealStateBuilderExample.java b/helix-core/src/main/java/org/apache/helix/examples/IdealStateBuilderExample.java
index b89d830..71e6662 100644
--- a/helix-core/src/main/java/org/apache/helix/examples/IdealStateBuilderExample.java
+++ b/helix-core/src/main/java/org/apache/helix/examples/IdealStateBuilderExample.java
@@ -22,7 +22,8 @@ package org.apache.helix.examples;
import org.apache.helix.controller.HelixControllerMain;
import org.apache.helix.manager.zk.ZKHelixAdmin;
import org.apache.helix.manager.zk.ZNRecordSerializer;
-import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.manager.zk.client.HelixZkClient;
+import org.apache.helix.manager.zk.client.SharedZkClientFactory;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.IdealState.RebalanceMode;
import org.apache.helix.model.InstanceConfig;
@@ -51,10 +52,11 @@ public class IdealStateBuilderExample {
final String clusterName = args[1];
RebalanceMode idealStateMode = RebalanceMode.valueOf(args[2].toUpperCase());
- ZkClient zkclient =
- new ZkClient(zkAddr, ZkClient.DEFAULT_SESSION_TIMEOUT, ZkClient.DEFAULT_CONNECTION_TIMEOUT,
- new ZNRecordSerializer());
- ZKHelixAdmin admin = new ZKHelixAdmin(zkclient);
+ HelixZkClient.ZkClientConfig clientConfig = new HelixZkClient.ZkClientConfig();
+ clientConfig.setZkSerializer(new ZNRecordSerializer());
+ final HelixZkClient zkClient = SharedZkClientFactory.getInstance()
+ .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddr), clientConfig);
+ ZKHelixAdmin admin = new ZKHelixAdmin(zkClient);
// add cluster
admin.addCluster(clusterName, true);
http://git-wip-us.apache.org/repos/asf/helix/blob/9d7364d7/helix-core/src/main/java/org/apache/helix/examples/IdealStateExample.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/examples/IdealStateExample.java b/helix-core/src/main/java/org/apache/helix/examples/IdealStateExample.java
index 7c5192d..723cbbe 100644
--- a/helix-core/src/main/java/org/apache/helix/examples/IdealStateExample.java
+++ b/helix-core/src/main/java/org/apache/helix/examples/IdealStateExample.java
@@ -22,7 +22,8 @@ package org.apache.helix.examples;
import org.apache.helix.controller.HelixControllerMain;
import org.apache.helix.manager.zk.ZKHelixAdmin;
import org.apache.helix.manager.zk.ZNRecordSerializer;
-import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.manager.zk.client.HelixZkClient;
+import org.apache.helix.manager.zk.client.SharedZkClientFactory;
import org.apache.helix.model.IdealState.RebalanceMode;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.StateModelDefinition;
@@ -31,7 +32,7 @@ import org.apache.helix.tools.StateModelConfigGenerator;
/**
* Ideal state json format file used in this example for CUSTOMIZED ideal state mode
* <p>
- *
+ *
* <pre>
* {
* "id" : "TestDB",
@@ -93,10 +94,11 @@ public class IdealStateExample {
}
// add cluster {clusterName}
- ZkClient zkclient =
- new ZkClient(zkAddr, ZkClient.DEFAULT_SESSION_TIMEOUT, ZkClient.DEFAULT_CONNECTION_TIMEOUT,
- new ZNRecordSerializer());
- ZKHelixAdmin admin = new ZKHelixAdmin(zkclient);
+ HelixZkClient.ZkClientConfig clientConfig = new HelixZkClient.ZkClientConfig();
+ clientConfig.setZkSerializer(new ZNRecordSerializer());
+ HelixZkClient zkClient = SharedZkClientFactory.getInstance()
+ .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddr), clientConfig);
+ ZKHelixAdmin admin = new ZKHelixAdmin(zkClient);
admin.addCluster(clusterName, true);
// add MasterSlave state mode definition
http://git-wip-us.apache.org/repos/asf/helix/blob/9d7364d7/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
index b6d452d..9e9d1a7 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
@@ -179,19 +179,11 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener {
*/
private List<NotificationContext.Type> _expectTypes = nextNotificationType.get(Type.FINALIZE);
- @Deprecated
- public CallbackHandler(HelixManager manager, ZkClient client, PropertyKey propertyKey,
+ public CallbackHandler(HelixManager manager, HelixZkClient client, PropertyKey propertyKey,
Object listener, EventType[] eventTypes, ChangeType changeType) {
this(manager, client, propertyKey, listener, eventTypes, changeType, null);
}
- @Deprecated
- public CallbackHandler(HelixManager manager, ZkClient client, PropertyKey propertyKey,
- Object listener, EventType[] eventTypes, ChangeType changeType,
- HelixCallbackMonitor monitor) {
- this(manager, (HelixZkClient) client, propertyKey, listener, eventTypes, changeType, monitor);
- }
-
public CallbackHandler(HelixManager manager, HelixZkClient client, PropertyKey propertyKey,
Object listener, EventType[] eventTypes, ChangeType changeType,
HelixCallbackMonitor monitor) {
http://git-wip-us.apache.org/repos/asf/helix/blob/9d7364d7/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
index 36fb969..28641e0 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
@@ -77,7 +77,7 @@ public class ParticipantManager {
final LiveInstanceInfoProvider _liveInstanceInfoProvider;
final List<PreConnectCallback> _preConnectCallbacks;
- public ParticipantManager(HelixManager manager, ZkClient zkclient, int sessionTimeout,
+ public ParticipantManager(HelixManager manager, HelixZkClient zkclient, int sessionTimeout,
LiveInstanceInfoProvider liveInstanceInfoProvider, List<PreConnectCallback> preConnectCallbacks) {
_zkclient = zkclient;
_manager = manager;
http://git-wip-us.apache.org/repos/asf/helix/blob/9d7364d7/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
index 98e2737..c673f51 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
@@ -715,6 +715,8 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
_messagingService.getExecutor().shutdown();
// TODO reset user defined handlers only
+ // TODO Fix the issue that when connection disconnected, reset handlers will be blocked. -- JJ
+ // This is because reset logic contains ZK operations.
resetHandlers();
if (_leaderElectionHandler != null) {
@@ -902,10 +904,10 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
boolean isConnected;
do {
isConnected =
- _zkclient.waitUntilConnected(ZkClient.DEFAULT_CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS);
+ _zkclient.waitUntilConnected(HelixZkClient.DEFAULT_CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS);
if (!isConnected) {
LOG.error("fail to connect zkserver: " + _zkAddress + " in "
- + ZkClient.DEFAULT_CONNECTION_TIMEOUT + "ms. expiredSessionId: " + _sessionId
+ + HelixZkClient.DEFAULT_CONNECTION_TIMEOUT + "ms. expiredSessionId: " + _sessionId
+ ", clusterName: " + _clusterName);
continue;
}
@@ -998,6 +1000,10 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
}
try {
+ // TODO Call disconnect in another thread.
+ // handleStateChanged is triggered in ZkClient eventThread. The disconnect logic will
+ // interrupt this thread. This issue prevents the ZkClient.close() from complete. So the
+ // client is left in a strange state.
disconnect();
} catch (Exception ex) {
LOG.error("Disconnect HelixManager is not completely done.", ex);
@@ -1094,7 +1100,7 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
_participantManager.reset();
}
_participantManager =
- new ParticipantManager(this, (ZkClient) _zkclient, _sessionTimeout, _liveInstanceInfoProvider,
+ new ParticipantManager(this, _zkclient, _sessionTimeout, _liveInstanceInfoProvider,
_preConnectCallbacks);
_participantManager.handleNewSession();
}
http://git-wip-us.apache.org/repos/asf/helix/blob/9d7364d7/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java b/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
index 9c18d09..4d2a93d 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
@@ -1491,6 +1491,10 @@ public class ZkClient implements Watcher {
throw new HelixException(
"Unable to connect to zookeeper server with the specified ZkConnection");
}
+ // TODO Refine the init state here. Here we pre-config it to be connected. This may not be
+ // the case, if the connection is connecting or recovering. -- JJ
+ // For shared client, the event notification will not be forwarded before wather add to the
+ // connection manager.
setCurrentState(KeeperState.SyncConnected);
}
http://git-wip-us.apache.org/repos/asf/helix/blob/9d7364d7/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
index 4fe732b..0225f83 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
@@ -25,7 +25,7 @@ import org.apache.helix.controller.rebalancer.util.RebalanceScheduler;
import org.apache.helix.manager.zk.ZKHelixAdmin;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.manager.zk.client.HelixZkClient;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.ResourceConfig;
import org.apache.helix.model.builder.CustomModeISBuilder;
@@ -84,13 +84,13 @@ public class TaskDriver {
manager.getHelixPropertyStore(), manager.getClusterName());
}
- public TaskDriver(ZkClient client, String clusterName) {
+ public TaskDriver(HelixZkClient client, String clusterName) {
this(client, new ZkBaseDataAccessor<ZNRecord>(client), clusterName);
}
- public TaskDriver(ZkClient client, ZkBaseDataAccessor<ZNRecord> baseAccessor, String clusterName) {
+ public TaskDriver(HelixZkClient client, ZkBaseDataAccessor<ZNRecord> baseAccessor, String clusterName) {
this(new ZKHelixAdmin(client), new ZKHelixDataAccessor(clusterName, baseAccessor),
- new ZkHelixPropertyStore<ZNRecord>(baseAccessor,
+ new ZkHelixPropertyStore<>(baseAccessor,
PropertyPathBuilder.propertyStore(clusterName), null), clusterName);
}
http://git-wip-us.apache.org/repos/asf/helix/blob/9d7364d7/helix-core/src/main/java/org/apache/helix/tools/ClusterExternalViewVerifier.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterExternalViewVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterExternalViewVerifier.java
index af80b48..7129c9f 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterExternalViewVerifier.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterExternalViewVerifier.java
@@ -29,7 +29,7 @@ import org.apache.helix.controller.stages.ClusterEvent;
import org.apache.helix.controller.stages.ClusterEventType;
import org.apache.helix.controller.stages.CurrentStateComputationStage;
import org.apache.helix.controller.stages.ResourceComputationStage;
-import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.manager.zk.client.HelixZkClient;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.Partition;
import org.slf4j.Logger;
@@ -54,7 +54,7 @@ public class ClusterExternalViewVerifier extends ClusterVerifier {
final List<String> _expectSortedLiveNodes; // always sorted
- public ClusterExternalViewVerifier(ZkClient zkclient, String clusterName,
+ public ClusterExternalViewVerifier(HelixZkClient zkclient, String clusterName,
List<String> expectLiveNodes) {
super(zkclient, clusterName);
_expectSortedLiveNodes = expectLiveNodes;
http://git-wip-us.apache.org/repos/asf/helix/blob/9d7364d7/helix-core/src/main/java/org/apache/helix/tools/ClusterLiveNodesVerifier.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterLiveNodesVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterLiveNodesVerifier.java
index d1187ab..164cfcc 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterLiveNodesVerifier.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterLiveNodesVerifier.java
@@ -19,11 +19,11 @@ package org.apache.helix.tools;
* under the License.
*/
-import org.apache.helix.manager.zk.ZkClient;
-
import java.util.Collections;
import java.util.List;
+import org.apache.helix.manager.zk.client.HelixZkClient;
+
/**
* Please use the class is in tools.ClusterVerifiers.
*/
@@ -32,7 +32,7 @@ public class ClusterLiveNodesVerifier extends ClusterVerifier {
final List<String> _expectSortedLiveNodes; // always sorted
- public ClusterLiveNodesVerifier(ZkClient zkclient, String clusterName,
+ public ClusterLiveNodesVerifier(HelixZkClient zkclient, String clusterName,
List<String> expectLiveNodes) {
super(zkclient, clusterName);
_expectSortedLiveNodes = expectLiveNodes;
http://git-wip-us.apache.org/repos/asf/helix/blob/9d7364d7/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
index 94a5f70..d21877f 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
@@ -45,7 +45,8 @@ import org.apache.helix.manager.zk.ZKHelixAdmin;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.manager.zk.ZNRecordSerializer;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.manager.zk.client.HelixZkClient;
+import org.apache.helix.manager.zk.client.SharedZkClientFactory;
import org.apache.helix.model.BuiltInStateModelDefinitions;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.ClusterConstraints;
@@ -135,22 +136,24 @@ public class ClusterSetup {
static Logger _logger = LoggerFactory.getLogger(ClusterSetup.class);
String _zkServerAddress;
- ZkClient _zkClient;
+ HelixZkClient _zkClient;
HelixAdmin _admin;
public ClusterSetup(String zkServerAddress) {
_zkServerAddress = zkServerAddress;
- _zkClient = ZKClientPool.getZkClient(_zkServerAddress);
+ _zkClient = SharedZkClientFactory.getInstance()
+ .buildZkClient(new HelixZkClient.ZkConnectionConfig(_zkServerAddress));
+ _zkClient.setZkSerializer(new ZNRecordSerializer());
_admin = new ZKHelixAdmin(_zkClient);
}
- public ClusterSetup(ZkClient zkClient) {
+ public ClusterSetup(HelixZkClient zkClient) {
_zkServerAddress = zkClient.getServers();
_zkClient = zkClient;
_admin = new ZKHelixAdmin(_zkClient);
}
- public ClusterSetup(ZkClient zkClient, HelixAdmin zkHelixAdmin) {
+ public ClusterSetup(HelixZkClient zkClient, HelixAdmin zkHelixAdmin) {
_zkServerAddress = zkClient.getServers();
_zkClient = zkClient;
_admin = zkHelixAdmin;
http://git-wip-us.apache.org/repos/asf/helix/blob/9d7364d7/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
index cc508ef..cc16ce2 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
@@ -28,6 +28,7 @@ import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import com.google.common.collect.Sets;
import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.exception.ZkNodeExistsException;
@@ -57,19 +58,19 @@ import org.apache.helix.controller.stages.ClusterEventType;
import org.apache.helix.controller.stages.CurrentStateComputationStage;
import org.apache.helix.controller.stages.ResourceComputationStage;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.manager.zk.client.DedicatedZkClientFactory;
+import org.apache.helix.manager.zk.client.HelixZkClient;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.Partition;
import org.apache.helix.model.Resource;
import org.apache.helix.task.TaskConstants;
-import org.apache.helix.util.ZKClientPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.collect.Sets;
-
/**
* This class is deprecated, please use dedicated verifier classes, such as BestPossibleExternViewVerifier, etc, in tools.ClusterVerifiers
*/
@@ -98,10 +99,11 @@ public class ClusterStateVerifier {
@Deprecated
static class ExtViewVeriferZkListener implements IZkChildListener, IZkDataListener {
final CountDownLatch _countDown;
- final ZkClient _zkClient;
+ final HelixZkClient _zkClient;
final Verifier _verifier;
- public ExtViewVeriferZkListener(CountDownLatch countDown, ZkClient zkClient, ZkVerifier verifier) {
+ public ExtViewVeriferZkListener(CountDownLatch countDown, HelixZkClient zkClient,
+ ZkVerifier verifier) {
_countDown = countDown;
_zkClient = zkClient;
_verifier = verifier;
@@ -136,17 +138,20 @@ public class ClusterStateVerifier {
}
}
- private static ZkClient validateAndGetClient(String zkAddr, String clusterName) {
+ private static HelixZkClient validateAndGetClient(String zkAddr, String clusterName) {
if (zkAddr == null || clusterName == null) {
throw new IllegalArgumentException("requires zkAddr|clusterName");
}
- return ZKClientPool.getZkClient(zkAddr);
+ HelixZkClient.ZkClientConfig clientConfig = new HelixZkClient.ZkClientConfig();
+ clientConfig.setZkSerializer(new ZNRecordSerializer());
+ return DedicatedZkClientFactory.getInstance()
+ .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddr), clientConfig);
}
public static class BestPossAndExtViewZkVerifier implements ZkVerifier {
private final String clusterName;
private final Map<String, Map<String, String>> errStates;
- private final ZkClient zkClient;
+ private final HelixZkClient zkClient;
private final Set<String> resources;
public BestPossAndExtViewZkVerifier(String zkAddr, String clusterName) {
@@ -163,7 +168,7 @@ public class ClusterStateVerifier {
this(validateAndGetClient(zkAddr, clusterName), clusterName, errStates, resources);
}
- public BestPossAndExtViewZkVerifier(ZkClient zkClient, String clusterName,
+ public BestPossAndExtViewZkVerifier(HelixZkClient zkClient, String clusterName,
Map<String, Map<String, String>> errStates, Set<String> resources) {
if (zkClient == null || clusterName == null) {
throw new IllegalArgumentException("requires zkClient|clusterName");
@@ -406,7 +411,7 @@ public class ClusterStateVerifier {
@Override
public ZkClient getZkClient() {
- return zkClient;
+ return (ZkClient) zkClient;
}
@Override
@@ -425,13 +430,13 @@ public class ClusterStateVerifier {
public static class MasterNbInExtViewVerifier implements ZkVerifier {
private final String clusterName;
- private final ZkClient zkClient;
+ private final HelixZkClient zkClient;
public MasterNbInExtViewVerifier(String zkAddr, String clusterName) {
this(validateAndGetClient(zkAddr, clusterName), clusterName);
}
- public MasterNbInExtViewVerifier(ZkClient zkClient, String clusterName) {
+ public MasterNbInExtViewVerifier(HelixZkClient zkClient, String clusterName) {
if (zkClient == null || clusterName == null) {
throw new IllegalArgumentException("requires zkClient|clusterName");
}
@@ -454,7 +459,7 @@ public class ClusterStateVerifier {
@Override
public ZkClient getZkClient() {
- return zkClient;
+ return (ZkClient) zkClient;
}
@Override
@@ -555,7 +560,7 @@ public class ClusterStateVerifier {
public static boolean verifyByZkCallback(ZkVerifier verifier, long timeout) {
long startTime = System.currentTimeMillis();
CountDownLatch countDown = new CountDownLatch(1);
- ZkClient zkClient = verifier.getZkClient();
+ HelixZkClient zkClient = verifier.getZkClient();
String clusterName = verifier.getClusterName();
// add an ephemeral node to /{clusterName}/CONFIGS/CLUSTER/verify
http://git-wip-us.apache.org/repos/asf/helix/blob/9d7364d7/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifier.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifier.java
index 5697bcf..d6e5a73 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifier.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifier.java
@@ -27,7 +27,7 @@ import org.apache.helix.ZNRecord;
import org.apache.helix.api.listeners.PreFetch;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.manager.zk.client.HelixZkClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -42,7 +42,7 @@ import java.util.concurrent.TimeUnit;
public abstract class ClusterVerifier implements IZkChildListener, IZkDataListener {
private static Logger LOG = LoggerFactory.getLogger(ClusterVerifier.class);
- protected final ZkClient _zkclient;
+ protected final HelixZkClient _zkclient;
protected final String _clusterName;
protected final HelixDataAccessor _accessor;
protected final PropertyKey.Builder _keyBuilder;
@@ -58,7 +58,7 @@ public abstract class ClusterVerifier implements IZkChildListener, IZkDataListen
}
}
- public ClusterVerifier(ZkClient zkclient, String clusterName) {
+ public ClusterVerifier(HelixZkClient zkclient, String clusterName) {
_zkclient = zkclient;
_clusterName = clusterName;
_accessor = new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(zkclient));
http://git-wip-us.apache.org/repos/asf/helix/blob/9d7364d7/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
index 6e73df6..3517444 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
@@ -33,6 +33,7 @@ import org.apache.helix.controller.stages.ClusterEventType;
import org.apache.helix.controller.stages.CurrentStateComputationStage;
import org.apache.helix.controller.stages.ResourceComputationStage;
import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.manager.zk.client.HelixZkClient;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.Partition;
@@ -73,7 +74,7 @@ public class BestPossibleExternalViewVerifier extends ZkHelixClusterVerifier {
_clusterDataCache = new ClusterDataCache();
}
- public BestPossibleExternalViewVerifier(ZkClient zkClient, String clusterName,
+ public BestPossibleExternalViewVerifier(HelixZkClient zkClient, String clusterName,
Set<String> resources, Map<String, Map<String, String>> errStates,
Set<String> expectLiveInstances) {
super(zkClient, clusterName);
@@ -89,7 +90,7 @@ public class BestPossibleExternalViewVerifier extends ZkHelixClusterVerifier {
private Set<String> _resources;
private Set<String> _expectLiveInstances;
private String _zkAddr;
- private ZkClient _zkClient;
+ private HelixZkClient _zkClient;
public Builder(String clusterName) {
_clusterName = clusterName;
@@ -148,11 +149,15 @@ public class BestPossibleExternalViewVerifier extends ZkHelixClusterVerifier {
return this;
}
- public ZkClient getZkClient() {
+ public HelixZkClient getHelixZkClient() {
return _zkClient;
}
- public Builder setZkClient(ZkClient zkClient) {
+ @Deprecated
+ public ZkClient getZkClient() {
+ return (ZkClient) getHelixZkClient();
+ }
+ public Builder setZkClient(HelixZkClient zkClient) {
_zkClient = zkClient;
return this;
}
http://git-wip-us.apache.org/repos/asf/helix/blob/9d7364d7/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ClusterLiveNodesVerifier.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ClusterLiveNodesVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ClusterLiveNodesVerifier.java
index 2a71566..b4d3862 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ClusterLiveNodesVerifier.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ClusterLiveNodesVerifier.java
@@ -24,16 +24,16 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
-import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.manager.zk.client.HelixZkClient;
public class ClusterLiveNodesVerifier extends ZkHelixClusterVerifier {
final Set<String> _expectLiveNodes;
- public ClusterLiveNodesVerifier(ZkClient zkclient, String clusterName,
+ public ClusterLiveNodesVerifier(HelixZkClient zkclient, String clusterName,
List<String> expectLiveNodes) {
super(zkclient, clusterName);
- _expectLiveNodes = new HashSet<String>(expectLiveNodes);
+ _expectLiveNodes = new HashSet<>(expectLiveNodes);
}
@Override
http://git-wip-us.apache.org/repos/asf/helix/blob/9d7364d7/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/StrictMatchExternalViewVerifier.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/StrictMatchExternalViewVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/StrictMatchExternalViewVerifier.java
index a1d12fa..e714789 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/StrictMatchExternalViewVerifier.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/StrictMatchExternalViewVerifier.java
@@ -35,6 +35,7 @@ import org.apache.helix.PropertyKey;
import org.apache.helix.controller.rebalancer.AbstractRebalancer;
import org.apache.helix.controller.stages.ClusterDataCache;
import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.manager.zk.client.HelixZkClient;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
@@ -63,7 +64,7 @@ public class StrictMatchExternalViewVerifier extends ZkHelixClusterVerifier {
_expectLiveInstances = expectLiveInstances;
}
- public StrictMatchExternalViewVerifier(ZkClient zkClient, String clusterName,
+ public StrictMatchExternalViewVerifier(HelixZkClient zkClient, String clusterName,
Set<String> resources, Set<String> expectLiveInstances) {
super(zkClient, clusterName);
_resources = resources;
@@ -75,7 +76,7 @@ public class StrictMatchExternalViewVerifier extends ZkHelixClusterVerifier {
private Set<String> _resources;
private Set<String> _expectLiveInstances;
private String _zkAddr;
- private ZkClient _zkClient;
+ private HelixZkClient _zkClient;
public StrictMatchExternalViewVerifier build() {
if (_clusterName == null || (_zkAddr == null && _zkClient == null)) {
@@ -125,11 +126,16 @@ public class StrictMatchExternalViewVerifier extends ZkHelixClusterVerifier {
return this;
}
- public ZkClient getZkClient() {
+ public HelixZkClient getHelixZkClient() {
return _zkClient;
}
- public Builder setZkClient(ZkClient zkClient) {
+ @Deprecated
+ public ZkClient getZkClient() {
+ return (ZkClient) getHelixZkClient();
+ }
+
+ public Builder setZkClient(HelixZkClient zkClient) {
_zkClient = zkClient;
return this;
}
http://git-wip-us.apache.org/repos/asf/helix/blob/9d7364d7/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ZkHelixClusterVerifier.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ZkHelixClusterVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ZkHelixClusterVerifier.java
index dbf9272..9c24f51 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ZkHelixClusterVerifier.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ZkHelixClusterVerifier.java
@@ -29,9 +29,11 @@ import org.apache.helix.PropertyKey;
import org.apache.helix.ZNRecord;
import org.apache.helix.api.listeners.PreFetch;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
import org.apache.helix.manager.zk.ZkClient;
-import org.apache.helix.util.ZKClientPool;
+import org.apache.helix.manager.zk.client.DedicatedZkClientFactory;
+import org.apache.helix.manager.zk.client.HelixZkClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,7 +48,7 @@ public abstract class ZkHelixClusterVerifier
protected static int DEFAULT_PERIOD = 100;
- protected final ZkClient _zkClient;
+ protected final HelixZkClient _zkClient;
protected final String _clusterName;
protected final HelixDataAccessor _accessor;
protected final PropertyKey.Builder _keyBuilder;
@@ -90,7 +92,7 @@ public abstract class ZkHelixClusterVerifier
}
}
- public ZkHelixClusterVerifier(ZkClient zkClient, String clusterName) {
+ public ZkHelixClusterVerifier(HelixZkClient zkClient, String clusterName) {
if (zkClient == null || clusterName == null) {
throw new IllegalArgumentException("requires zkClient|clusterName");
}
@@ -104,7 +106,9 @@ public abstract class ZkHelixClusterVerifier
if (zkAddr == null || clusterName == null) {
throw new IllegalArgumentException("requires zkAddr|clusterName");
}
- _zkClient = ZKClientPool.getZkClient(zkAddr);
+ _zkClient = DedicatedZkClientFactory.getInstance()
+ .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddr));
+ _zkClient.setZkSerializer(new ZNRecordSerializer());
_clusterName = clusterName;
_accessor = new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
_keyBuilder = _accessor.keyBuilder();
@@ -285,10 +289,15 @@ public abstract class ZkHelixClusterVerifier
}
}
- public ZkClient getZkClient() {
+ public HelixZkClient getHelixZkClient() {
return _zkClient;
}
+ @Deprecated
+ public ZkClient getZkClient() {
+ return (ZkClient) getHelixZkClient();
+ }
+
public String getClusterName() {
return _clusterName;
}
http://git-wip-us.apache.org/repos/asf/helix/blob/9d7364d7/helix-core/src/main/java/org/apache/helix/tools/TestExecutor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/TestExecutor.java b/helix-core/src/main/java/org/apache/helix/tools/TestExecutor.java
index 908bba5..6a757f3 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/TestExecutor.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/TestExecutor.java
@@ -34,7 +34,6 @@ import org.I0Itec.zkclient.exception.ZkNodeExistsException;
import org.apache.helix.HelixManager;
import org.apache.helix.ZNRecord;
import org.apache.helix.manager.zk.ZNRecordSerializer;
-import org.apache.helix.manager.zk.ZkClient;
import org.apache.helix.manager.zk.client.HelixZkClient;
import org.apache.helix.manager.zk.client.SharedZkClientFactory;
import org.apache.helix.store.PropertyJsonComparator;
@@ -536,7 +535,7 @@ public class TestExecutor {
return result;
}
- private static boolean compareAndSetZnode(ZnodeValue expect, ZnodeOpArg arg, ZkClient zkClient,
+ private static boolean compareAndSetZnode(ZnodeValue expect, ZnodeOpArg arg, HelixZkClient zkClient,
ZNRecord diff) {
String path = arg._znodePath;
ZnodePropertyType type = arg._propertyType;
@@ -639,12 +638,12 @@ public class TestExecutor {
private static class ExecuteCommand implements Runnable {
private final TestCommand _command;
private final long _startTime;
- private final ZkClient _zkClient;
+ private final HelixZkClient _zkClient;
private final CountDownLatch _countDown;
private final Map<TestCommand, Boolean> _testResults;
public ExecuteCommand(long startTime, TestCommand command, CountDownLatch countDown,
- ZkClient zkClient, Map<TestCommand, Boolean> testResults) {
+ HelixZkClient zkClient, Map<TestCommand, Boolean> testResults) {
_startTime = startTime;
_command = command;
_countDown = countDown;
@@ -735,9 +734,7 @@ public class TestExecutor {
}
_countDown.countDown();
if (_countDown.getCount() == 0) {
- if (_zkClient != null && _zkClient.getConnection() != null)
-
- {
+ if (_zkClient != null && !_zkClient.isClosed()) {
_zkClient.close();
}
}
@@ -768,7 +765,7 @@ public class TestExecutor {
TestTrigger trigger = command._trigger;
command._startTimestamp = System.currentTimeMillis() + trigger._startTime;
- new Thread(new ExecuteCommand(command._startTimestamp, command, countDown, (ZkClient) zkClient,
+ new Thread(new ExecuteCommand(command._startTimestamp, command, countDown, zkClient,
testResults)).start();
}
http://git-wip-us.apache.org/repos/asf/helix/blob/9d7364d7/helix-core/src/main/java/org/apache/helix/tools/commandtools/IntegrationTestUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/commandtools/IntegrationTestUtil.java b/helix-core/src/main/java/org/apache/helix/tools/commandtools/IntegrationTestUtil.java
index dc2af8f..18f06f4 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/commandtools/IntegrationTestUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/commandtools/IntegrationTestUtil.java
@@ -35,11 +35,12 @@ import org.apache.commons.cli.ParseException;
import org.apache.helix.PropertyKey;
import org.apache.helix.ZNRecord;
import org.apache.helix.manager.zk.ZNRecordSerializer;
-import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.manager.zk.client.DedicatedZkClientFactory;
+import org.apache.helix.manager.zk.client.HelixZkClient;
+import org.apache.helix.manager.zk.client.SharedZkClientFactory;
import org.apache.helix.tools.ClusterExternalViewVerifier;
import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
import org.apache.helix.tools.ClusterVerifiers.ClusterLiveNodesVerifier;
-import org.apache.helix.tools.ClusterVerifiers.HelixClusterVerifier;
import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -62,11 +63,11 @@ public class IntegrationTestUtil {
public static final String readLeader = "readLeader";
public static final String verifyClusterState = "verifyClusterState";
- final ZkClient _zkclient;
+ final HelixZkClient _zkclient;
final ZNRecordSerializer _serializer;
final long _timeoutValue;
- public IntegrationTestUtil(ZkClient zkclient, long timeoutValue) {
+ public IntegrationTestUtil(HelixZkClient zkclient, long timeoutValue) {
_zkclient = zkclient;
_timeoutValue = timeoutValue;
_serializer = new ZNRecordSerializer();
@@ -213,10 +214,10 @@ public class IntegrationTestUtil {
System.exit(1);
}
- String zkServer = cmd.getOptionValue(zkSvr);
- ZkClient zkclient =
- new ZkClient(zkServer, ZkClient.DEFAULT_SESSION_TIMEOUT,
- ZkClient.DEFAULT_CONNECTION_TIMEOUT, new ZNRecordSerializer());
+ HelixZkClient.ZkClientConfig clientConfig = new HelixZkClient.ZkClientConfig();
+ clientConfig.setZkSerializer(new ZNRecordSerializer());
+ HelixZkClient zkClient = DedicatedZkClientFactory.getInstance()
+ .buildZkClient(new HelixZkClient.ZkConnectionConfig(cmd.getOptionValue(zkSvr)), clientConfig);
long timeoutValue = DEFAULT_TIMEOUT;
if (cmd.hasOption(timeout)) {
@@ -229,7 +230,7 @@ public class IntegrationTestUtil {
}
}
- IntegrationTestUtil util = new IntegrationTestUtil(zkclient, timeoutValue);
+ IntegrationTestUtil util = new IntegrationTestUtil(zkClient, timeoutValue);
if (cmd != null) {
if (cmd.hasOption(verifyExternalView)) {
http://git-wip-us.apache.org/repos/asf/helix/blob/9d7364d7/helix-core/src/main/java/org/apache/helix/util/ZKClientPool.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/util/ZKClientPool.java b/helix-core/src/main/java/org/apache/helix/util/ZKClientPool.java
index 0980e48..3350d57 100644
--- a/helix-core/src/main/java/org/apache/helix/util/ZKClientPool.java
+++ b/helix-core/src/main/java/org/apache/helix/util/ZKClientPool.java
@@ -27,7 +27,7 @@ import org.apache.helix.manager.zk.ZkClient;
import org.apache.zookeeper.ZooKeeper.States;
public class ZKClientPool {
- static final Map<String, ZkClient> _zkClientMap = new ConcurrentHashMap<String, ZkClient>();
+ static final Map<String, ZkClient> _zkClientMap = new ConcurrentHashMap<>();
static final int DEFAULT_SESSION_TIMEOUT = 30 * 1000;
public static ZkClient getZkClient(String zkServer) {
http://git-wip-us.apache.org/repos/asf/helix/blob/9d7364d7/helix-core/src/test/java/org/apache/helix/TestHelper.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/TestHelper.java b/helix-core/src/test/java/org/apache/helix/TestHelper.java
index 2f25e30..edc0646 100644
--- a/helix-core/src/test/java/org/apache/helix/TestHelper.java
+++ b/helix-core/src/test/java/org/apache/helix/TestHelper.java
@@ -37,6 +37,7 @@ import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+
import org.I0Itec.zkclient.IDefaultNameSpace;
import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.IZkDataListener;
@@ -50,7 +51,8 @@ import org.apache.helix.manager.zk.ZKHelixAdmin;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.manager.zk.ZNRecordSerializer;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.manager.zk.client.HelixZkClient;
+import org.apache.helix.manager.zk.client.SharedZkClientFactory;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState.RebalanceMode;
@@ -144,7 +146,7 @@ public class TestHelper {
}
}
- public static void setupEmptyCluster(ZkClient zkClient, String clusterName) {
+ public static void setupEmptyCluster(HelixZkClient zkClient, String clusterName) {
ZKHelixAdmin admin = new ZKHelixAdmin(zkClient);
admin.addCluster(clusterName, true);
}
@@ -211,7 +213,8 @@ public class TestHelper {
public static boolean verifyEmptyCurStateAndExtView(String clusterName, String resourceName,
Set<String> instanceNames, String zkAddr) {
- ZkClient zkClient = new ZkClient(zkAddr);
+ HelixZkClient zkClient = SharedZkClientFactory.getInstance()
+ .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddr));
zkClient.setZkSerializer(new ZNRecordSerializer());
try {
@@ -257,17 +260,18 @@ public class TestHelper {
RebalanceMode.SEMI_AUTO, doRebalance);
}
- public static void setupCluster(String clusterName, String ZkAddr, int startPort,
+ public static void setupCluster(String clusterName, String zkAddr, int startPort,
String participantNamePrefix, String resourceNamePrefix, int resourceNb, int partitionNb,
int nodesNb, int replica, String stateModelDef, RebalanceMode mode, boolean doRebalance)
throws Exception {
- ZkClient zkClient = new ZkClient(ZkAddr);
+ HelixZkClient zkClient = SharedZkClientFactory.getInstance()
+ .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddr));
if (zkClient.exists("/" + clusterName)) {
LOG.warn("Cluster already exists:" + clusterName + ". Deleting it");
zkClient.deleteRecursively("/" + clusterName);
}
- ClusterSetup setupTool = new ClusterSetup(ZkAddr);
+ ClusterSetup setupTool = new ClusterSetup(zkAddr);
setupTool.addCluster(clusterName, true);
for (int i = 0; i < nodesNb; i++) {
@@ -286,12 +290,12 @@ public class TestHelper {
zkClient.close();
}
- public static void dropCluster(String clusterName, ZkClient zkClient) throws Exception {
+ public static void dropCluster(String clusterName, HelixZkClient zkClient) throws Exception {
ClusterSetup setupTool = new ClusterSetup(zkClient);
dropCluster(clusterName, zkClient, setupTool);
}
- public static void dropCluster(String clusterName, ZkClient zkClient, ClusterSetup setup) {
+ public static void dropCluster(String clusterName, HelixZkClient zkClient, ClusterSetup setup) {
String namespace = "/" + clusterName;
if (zkClient.exists(namespace)) {
try {
@@ -310,7 +314,8 @@ public class TestHelper {
*/
public static void verifyState(String clusterName, String zkAddr,
Map<String, Set<String>> stateMap, String state) {
- ZkClient zkClient = new ZkClient(zkAddr);
+ HelixZkClient zkClient = SharedZkClientFactory.getInstance()
+ .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddr));
zkClient.setZkSerializer(new ZNRecordSerializer());
try {
@@ -513,7 +518,7 @@ public class TestHelper {
System.out.println("END:Print cache");
}
- public static void readZkRecursive(String path, Map<String, ZNode> map, ZkClient zkclient) {
+ public static void readZkRecursive(String path, Map<String, ZNode> map, HelixZkClient zkclient) {
try {
Stat stat = new Stat();
ZNRecord record = zkclient.readData(path, stat);
@@ -554,7 +559,7 @@ public class TestHelper {
}
public static boolean verifyZkCache(List<String> paths, BaseDataAccessor<ZNRecord> zkAccessor,
- ZkClient zkclient, boolean needVerifyStat) {
+ HelixZkClient zkclient, boolean needVerifyStat) {
// read everything
Map<String, ZNode> zkMap = new HashMap<String, ZNode>();
Map<String, ZNode> cache = new HashMap<String, ZNode>();
@@ -568,12 +573,12 @@ public class TestHelper {
}
public static boolean verifyZkCache(List<String> paths, Map<String, ZNode> cache,
- ZkClient zkclient, boolean needVerifyStat) {
+ HelixZkClient zkclient, boolean needVerifyStat) {
return verifyZkCache(paths, null, cache, zkclient, needVerifyStat);
}
public static boolean verifyZkCache(List<String> paths, List<String> pathsExcludeForStat,
- Map<String, ZNode> cache, ZkClient zkclient, boolean needVerifyStat) {
+ Map<String, ZNode> cache, HelixZkClient zkclient, boolean needVerifyStat) {
// read everything on zk under paths
Map<String, ZNode> zkMap = new HashMap<String, ZNode>();
for (String path : paths) {
@@ -799,7 +804,7 @@ public class TestHelper {
return sb.toString();
}
- public static void printZkListeners(ZkClient client) throws Exception {
+ public static void printZkListeners(HelixZkClient client) throws Exception {
Map<String, Set<IZkDataListener>> datalisteners = ZkTestHelper.getZkDataListener(client);
Map<String, Set<IZkChildListener>> childListeners = ZkTestHelper.getZkChildListener(client);
http://git-wip-us.apache.org/repos/asf/helix/blob/9d7364d7/helix-core/src/test/java/org/apache/helix/TestHierarchicalDataStore.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/TestHierarchicalDataStore.java b/helix-core/src/test/java/org/apache/helix/TestHierarchicalDataStore.java
index 07aa4f4..bd123be 100644
--- a/helix-core/src/test/java/org/apache/helix/TestHierarchicalDataStore.java
+++ b/helix-core/src/test/java/org/apache/helix/TestHierarchicalDataStore.java
@@ -20,27 +20,29 @@ package org.apache.helix;
*/
import java.io.FileFilter;
+
import org.apache.helix.controller.HierarchicalDataHolder;
-import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.manager.zk.client.HelixZkClient;
+import org.apache.helix.manager.zk.client.SharedZkClientFactory;
import org.testng.AssertJUnit;
import org.testng.annotations.Test;
public class TestHierarchicalDataStore extends ZkUnitTestBase {
- protected static ZkClient _zkClientString = null;
+ protected static HelixZkClient _zkClient = null;
- @Test(groups = {
- "unitTest"
+ @Test(groups = { "unitTest"
})
+
public void testHierarchicalDataStore() {
- _zkClientString = new ZkClient(ZK_ADDR, 1000, 3000);
+ _zkClient = SharedZkClientFactory.getInstance()
+ .buildZkClient(new HelixZkClient.ZkConnectionConfig(ZK_ADDR));
String path = "/tmp/testHierarchicalDataStore";
FileFilter filter = null;
- // _zkClient.setZkSerializer(new ZNRecordSerializer());
- _zkClientString.deleteRecursively(path);
+ _zkClient.deleteRecursively(path);
HierarchicalDataHolder<ZNRecord> dataHolder =
- new HierarchicalDataHolder<ZNRecord>(_zkClientString, path, filter);
+ new HierarchicalDataHolder<ZNRecord>(_zkClient, path, filter);
dataHolder.print();
AssertJUnit.assertFalse(dataHolder.refreshData());
@@ -69,12 +71,12 @@ public class TestHierarchicalDataStore extends ZkUnitTestBase {
}
private void set(String path, String data) {
- _zkClientString.writeData(path, data);
+ _zkClient.writeData(path, data);
}
private void add(String path, String data) {
- _zkClientString.createPersistent(path, true);
- _zkClientString.writeData(path, data);
+ _zkClient.createPersistent(path, true);
+ _zkClient.writeData(path, data);
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/9d7364d7/helix-core/src/test/java/org/apache/helix/TestZKCallback.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/TestZKCallback.java b/helix-core/src/test/java/org/apache/helix/TestZKCallback.java
index ee0a7c7..b80e4d6 100644
--- a/helix-core/src/test/java/org/apache/helix/TestZKCallback.java
+++ b/helix-core/src/test/java/org/apache/helix/TestZKCallback.java
@@ -29,8 +29,6 @@ import org.apache.helix.api.listeners.ExternalViewChangeListener;
import org.apache.helix.api.listeners.IdealStateChangeListener;
import org.apache.helix.api.listeners.LiveInstanceChangeListener;
import org.apache.helix.api.listeners.MessageListener;
-import org.apache.helix.manager.zk.ZNRecordSerializer;
-import org.apache.helix.manager.zk.ZkClient;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
http://git-wip-us.apache.org/repos/asf/helix/blob/9d7364d7/helix-core/src/test/java/org/apache/helix/TestZkBasis.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/TestZkBasis.java b/helix-core/src/test/java/org/apache/helix/TestZkBasis.java
index 8b25214..5ef27a5 100644
--- a/helix-core/src/test/java/org/apache/helix/TestZkBasis.java
+++ b/helix-core/src/test/java/org/apache/helix/TestZkBasis.java
@@ -20,6 +20,7 @@ package org.apache.helix;
*/
import java.util.Collections;
+import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -29,6 +30,7 @@ import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.IZkDataListener;
import org.apache.helix.manager.zk.ZNRecordSerializer;
import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.manager.zk.client.HelixZkClient;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -64,6 +66,83 @@ public class TestZkBasis extends ZkUnitTestBase {
}
}
+
+ @Test
+ public void testZkSessionExpiry() throws Exception {
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String clusterName = className + "_" + methodName;
+ System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+ ZkClient client =
+ new ZkClient(ZK_ADDR, HelixZkClient.DEFAULT_SESSION_TIMEOUT,
+ HelixZkClient.DEFAULT_CONNECTION_TIMEOUT, new ZNRecordSerializer());
+
+ String path = String.format("/%s", clusterName);
+ client.createEphemeral(path);
+ String oldSessionId = ZkTestHelper.getSessionId(client);
+ ZkTestHelper.expireSession(client);
+ String newSessionId = ZkTestHelper.getSessionId(client);
+ Assert.assertNotSame(newSessionId, oldSessionId);
+ Assert.assertFalse(client.exists(path), "Ephemeral znode should be gone after session expiry");
+ client.close();
+ System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+ }
+
+ @Test
+ public void testCloseZkClient() {
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String clusterName = className + "_" + methodName;
+ System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+ ZkClient client =
+ new ZkClient(ZK_ADDR, HelixZkClient.DEFAULT_SESSION_TIMEOUT,
+ HelixZkClient.DEFAULT_CONNECTION_TIMEOUT, new ZNRecordSerializer());
+ String path = String.format("/%s", clusterName);
+ client.createEphemeral(path);
+
+ client.close();
+ Assert.assertFalse(_gZkClient.exists(path), "Ephemeral node: " + path
+ + " should be removed after ZkClient#close()");
+ System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+ }
+
+ @Test
+ public void testCloseZkClientInZkClientEventThread() throws Exception {
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String clusterName = className + "_" + methodName;
+ System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+ final CountDownLatch waitCallback = new CountDownLatch(1);
+ final ZkClient client =
+ new ZkClient(ZK_ADDR, HelixZkClient.DEFAULT_SESSION_TIMEOUT,
+ HelixZkClient.DEFAULT_CONNECTION_TIMEOUT, new ZNRecordSerializer());
+ String path = String.format("/%s", clusterName);
+ client.createEphemeral(path);
+ client.subscribeDataChanges(path, new IZkDataListener() {
+
+ @Override
+ public void handleDataDeleted(String dataPath) throws Exception {
+ }
+
+ @Override
+ public void handleDataChange(String dataPath, Object data) throws Exception {
+ client.close();
+ waitCallback.countDown();
+ }
+ });
+
+ client.writeData(path, new ZNRecord("test"));
+ waitCallback.await();
+ Assert.assertFalse(_gZkClient.exists(path), "Ephemeral node: " + path
+ + " should be removed after ZkClient#close() in its own event-thread");
+
+ System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+ }
+
/**
* test zk watchers are renewed automatically after session expiry
* zookeeper-client side keeps all registered watchers see ZooKeeper.WatchRegistration.register()
@@ -76,14 +155,13 @@ public class TestZkBasis extends ZkUnitTestBase {
*/
@Test
public void testWatchRenew() throws Exception {
-
String className = TestHelper.getTestClassName();
String methodName = TestHelper.getTestMethodName();
String testName = className + "_" + methodName;
final ZkClient client =
- new ZkClient(ZK_ADDR, ZkClient.DEFAULT_SESSION_TIMEOUT,
- ZkClient.DEFAULT_CONNECTION_TIMEOUT, new ZNRecordSerializer());
+ new ZkClient(ZK_ADDR, HelixZkClient.DEFAULT_SESSION_TIMEOUT,
+ HelixZkClient.DEFAULT_CONNECTION_TIMEOUT, new ZNRecordSerializer());
// make sure "/testName/test" doesn't exist
final String path = "/" + testName + "/test";
client.delete(path);
@@ -127,8 +205,8 @@ public class TestZkBasis extends ZkUnitTestBase {
String testName = className + "_" + methodName;
final ZkClient client =
- new ZkClient(ZK_ADDR, ZkClient.DEFAULT_SESSION_TIMEOUT,
- ZkClient.DEFAULT_CONNECTION_TIMEOUT, new ZNRecordSerializer());
+ new ZkClient(ZK_ADDR, HelixZkClient.DEFAULT_SESSION_TIMEOUT,
+ HelixZkClient.DEFAULT_CONNECTION_TIMEOUT, new ZNRecordSerializer());
// make sure "/testName/test" doesn't exist
final String path = "/" + testName + "/test";
client.createPersistent(path, true);
http://git-wip-us.apache.org/repos/asf/helix/blob/9d7364d7/helix-core/src/test/java/org/apache/helix/TestZnodeModify.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/TestZnodeModify.java b/helix-core/src/test/java/org/apache/helix/TestZnodeModify.java
index b898b8c..e5851f5 100644
--- a/helix-core/src/test/java/org/apache/helix/TestZnodeModify.java
+++ b/helix-core/src/test/java/org/apache/helix/TestZnodeModify.java
@@ -25,7 +25,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.helix.manager.zk.ZNRecordSerializer;
-import org.apache.helix.manager.zk.ZkClient;
import org.apache.helix.model.IdealState.IdealStateProperty;
import org.apache.helix.model.IdealState.RebalanceMode;
import org.apache.helix.tools.TestCommand;
@@ -210,10 +209,8 @@ public class TestZnodeModify extends ZkUnitTestBase {
public void run() {
try {
Thread.sleep(3000);
- final ZkClient zkClient = new ZkClient(ZK_ADDR);
- zkClient.setZkSerializer(new ZNRecordSerializer());
- zkClient.createPersistent(pathChild1, true);
- zkClient.writeData(pathChild1, record);
+ _gZkClient.createPersistent(pathChild1, true);
+ _gZkClient.writeData(pathChild1, record);
} catch (InterruptedException e) {
logger.error("Interrupted sleep", e);
}
@@ -228,28 +225,19 @@ public class TestZnodeModify extends ZkUnitTestBase {
}
- ZkClient _zkClient;
-
@BeforeClass()
public void beforeClass() {
System.out.println("START " + getShortClassName() + " at "
+ new Date(System.currentTimeMillis()));
-
- _zkClient = new ZkClient(ZK_ADDR);
- _zkClient.setZkSerializer(new ZNRecordSerializer());
- if (_zkClient.exists(PREFIX)) {
- _zkClient.deleteRecursively(PREFIX);
+ if (_gZkClient.exists(PREFIX)) {
+ _gZkClient.deleteRecursively(PREFIX);
}
-
}
@AfterClass
public void afterClass() {
- _zkClient.close();
-
System.out
.println("END " + getShortClassName() + " at " + new Date(System.currentTimeMillis()));
-
}
private ZNRecord getExampleZNRecord() {
@@ -267,5 +255,4 @@ public class TestZnodeModify extends ZkUnitTestBase {
record.setListField("TestDB_0", list);
return record;
}
-
}
http://git-wip-us.apache.org/repos/asf/helix/blob/9d7364d7/helix-core/src/test/java/org/apache/helix/ZkTestHelper.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/ZkTestHelper.java b/helix-core/src/test/java/org/apache/helix/ZkTestHelper.java
index 32d1085..19cd2e8 100644
--- a/helix-core/src/test/java/org/apache/helix/ZkTestHelper.java
+++ b/helix-core/src/test/java/org/apache/helix/ZkTestHelper.java
@@ -32,6 +32,10 @@ import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.IZkDataListener;
@@ -41,6 +45,7 @@ import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.manager.zk.client.HelixZkClient;
import org.apache.helix.model.ExternalView;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
@@ -54,6 +59,7 @@ import org.testng.Assert;
public class ZkTestHelper {
private static Logger LOG = LoggerFactory.getLogger(ZkTestHelper.class);
+ private static ExecutorService _executor = Executors.newSingleThreadExecutor();
static {
// Logger.getRootLogger().setLevel(Level.DEBUG);
@@ -62,11 +68,12 @@ public class ZkTestHelper {
/**
* Simulate a zk state change by calling {@link ZkClient#process(WatchedEvent)} directly
*/
- public static void simulateZkStateReconnected(ZkClient client) {
+ public static void simulateZkStateReconnected(HelixZkClient client) {
+ ZkClient zkClient = (ZkClient) client;
WatchedEvent event = new WatchedEvent(EventType.None, KeeperState.Disconnected, null);
- client.process(event);
+ zkClient.process(event);
event = new WatchedEvent(EventType.None, KeeperState.SyncConnected, null);
- client.process(event);
+ zkClient.process(event);
}
/**
@@ -74,19 +81,20 @@ public class ZkTestHelper {
* @param client
* @return
*/
- public static String getSessionId(ZkClient client) {
- ZkConnection connection = ((ZkConnection) client.getConnection());
+ public static String getSessionId(HelixZkClient client) {
+ ZkConnection connection = (ZkConnection) ((ZkClient) client).getConnection();
ZooKeeper curZookeeper = connection.getZookeeper();
return Long.toHexString(curZookeeper.getSessionId());
}
/**
* Expire current zk session and wait for {@link IZkStateListener#handleNewSession()} invoked
- * @param zkClient
+ * @param client
* @throws Exception
*/
- public static void disconnectSession(final ZkClient zkClient) throws Exception {
+ public static void disconnectSession(HelixZkClient client) throws Exception {
+ final ZkClient zkClient = (ZkClient) client;
IZkStateListener listener = new IZkStateListener() {
@Override
public void handleStateChanged(KeeperState state) throws Exception {
@@ -96,7 +104,7 @@ public class ZkTestHelper {
@Override
public void handleNewSession() throws Exception {
// make sure zkclient is connected again
- zkClient.waitUntilConnected(ZkClient.DEFAULT_CONNECTION_TIMEOUT, TimeUnit.SECONDS);
+ zkClient.waitUntilConnected(HelixZkClient.DEFAULT_CONNECTION_TIMEOUT, TimeUnit.SECONDS);
ZkConnection connection = ((ZkConnection) zkClient.getConnection());
ZooKeeper curZookeeper = connection.getZookeeper();
@@ -138,8 +146,9 @@ public class ZkTestHelper {
LOG.info("After expiry. sessionId: " + Long.toHexString(curZookeeper.getSessionId()));
}
- public static void expireSession(final ZkClient zkClient) throws Exception {
+ public static void expireSession(HelixZkClient client) throws Exception {
final CountDownLatch waitNewSession = new CountDownLatch(1);
+ final ZkClient zkClient = (ZkClient) client;
IZkStateListener listener = new IZkStateListener() {
@Override
@@ -150,7 +159,7 @@ public class ZkTestHelper {
@Override
public void handleNewSession() throws Exception {
// make sure zkclient is connected again
- zkClient.waitUntilConnected(ZkClient.DEFAULT_CONNECTION_TIMEOUT, TimeUnit.SECONDS);
+ zkClient.waitUntilConnected(HelixZkClient.DEFAULT_CONNECTION_TIMEOUT, TimeUnit.SECONDS);
ZkConnection connection = ((ZkConnection) zkClient.getConnection());
ZooKeeper curZookeeper = connection.getZookeeper();
@@ -204,10 +213,11 @@ public class ZkTestHelper {
/**
* expire zk session asynchronously
- * @param zkClient
+ * @param client
* @throws Exception
*/
- public static void asyncExpireSession(final ZkClient zkClient) throws Exception {
+ public static void asyncExpireSession(HelixZkClient client) throws Exception {
+ final ZkClient zkClient = (ZkClient) client;
ZkConnection connection = ((ZkConnection) zkClient.getConnection());
ZooKeeper curZookeeper = connection.getZookeeper();
LOG.info("Before expiry. sessionId: " + Long.toHexString(curZookeeper.getSessionId()));
@@ -238,7 +248,7 @@ public class ZkTestHelper {
/*
* stateMap: partition->instance->state
*/
- public static boolean verifyState(ZkClient zkclient, String clusterName, String resourceName,
+ public static boolean verifyState(HelixZkClient zkclient, String clusterName, String resourceName,
Map<String, Map<String, String>> expectStateMap, String op) {
boolean result = true;
ZkBaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(zkclient);
@@ -382,9 +392,11 @@ public class ZkTestHelper {
}
}
- public static Map<String, List<String>> getZkWatch(ZkClient client) throws Exception {
+ public static Map<String, List<String>> getZkWatch(HelixZkClient client) throws Exception {
Map<String, List<String>> lists = new HashMap<String, List<String>>();
- ZkConnection connection = ((ZkConnection) client.getConnection());
+ ZkClient zkClient = (ZkClient) client;
+
+ ZkConnection connection = ((ZkConnection) zkClient.getConnection());
ZooKeeper zk = connection.getZookeeper();
java.lang.reflect.Field field = getField(zk.getClass(), "watchManager");
@@ -406,14 +418,14 @@ public class ZkTestHelper {
HashMap<String, Set<Watcher>> childWatches =
(HashMap<String, Set<Watcher>>) field2.get(watchManager);
- lists.put("dataWatches", new ArrayList<String>(dataWatches.keySet()));
- lists.put("existWatches", new ArrayList<String>(existWatches.keySet()));
- lists.put("childWatches", new ArrayList<String>(childWatches.keySet()));
+ lists.put("dataWatches", new ArrayList<>(dataWatches.keySet()));
+ lists.put("existWatches", new ArrayList<>(existWatches.keySet()));
+ lists.put("childWatches", new ArrayList<>(childWatches.keySet()));
return lists;
}
- public static Map<String, Set<IZkDataListener>> getZkDataListener(ZkClient client)
+ public static Map<String, Set<IZkDataListener>> getZkDataListener(HelixZkClient client)
throws Exception {
java.lang.reflect.Field field = getField(client.getClass(), "_dataListener");
field.setAccessible(true);
@@ -422,7 +434,7 @@ public class ZkTestHelper {
return dataListener;
}
- public static Map<String, Set<IZkChildListener>> getZkChildListener(ZkClient client)
+ public static Map<String, Set<IZkChildListener>> getZkChildListener(HelixZkClient client)
throws Exception {
java.lang.reflect.Field field = getField(client.getClass(), "_childListener");
field.setAccessible(true);
@@ -431,7 +443,7 @@ public class ZkTestHelper {
return childListener;
}
- public static boolean tryWaitZkEventsCleaned(ZkClient zkclient) throws Exception {
+ public static boolean tryWaitZkEventsCleaned(HelixZkClient zkclient) throws Exception {
java.lang.reflect.Field field = getField(zkclient.getClass(), "_eventThread");
field.setAccessible(true);
Object eventThread = field.get(zkclient);
@@ -456,4 +468,18 @@ public class ZkTestHelper {
}
return false;
}
+
+ public static void injectExpire(HelixZkClient client)
+ throws ExecutionException, InterruptedException {
+ final ZkClient zkClient = (ZkClient) client;
+ Future future = _executor.submit(new Runnable() {
+ @Override
+ public void run() {
+ WatchedEvent event =
+ new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.Expired, null);
+ zkClient.process(event);
+ }
+ });
+ future.get();
+ }
}
http://git-wip-us.apache.org/repos/asf/helix/blob/9d7364d7/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java b/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
index c69744e..b0c44e1 100644
--- a/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
+++ b/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
@@ -27,6 +27,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.logging.Level;
+
import org.I0Itec.zkclient.IZkStateListener;
import org.I0Itec.zkclient.ZkConnection;
import org.I0Itec.zkclient.ZkServer;
@@ -56,6 +57,8 @@ import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.manager.zk.ZNRecordSerializer;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.manager.zk.client.DedicatedZkClientFactory;
+import org.apache.helix.manager.zk.client.HelixZkClient;
import org.apache.helix.model.BuiltInStateModelDefinitions;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.ConfigScope;
@@ -72,7 +75,6 @@ import org.apache.helix.model.builder.ConfigScopeBuilder;
import org.apache.helix.tools.ClusterSetup;
import org.apache.helix.tools.ClusterStateVerifier;
import org.apache.helix.tools.StateModelConfigGenerator;
-import org.apache.helix.util.ZKClientPool;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
@@ -90,7 +92,7 @@ public class ZkTestBase {
private static Logger LOG = LoggerFactory.getLogger(ZkTestBase.class);
protected static ZkServer _zkServer;
- protected static ZkClient _gZkClient;
+ protected static HelixZkClient _gZkClient;
protected static ClusterSetup _gSetupTool;
protected static BaseDataAccessor<ZNRecord> _baseAccessor;
@@ -113,17 +115,17 @@ public class ZkTestBase {
_zkServer = TestHelper.startZkServer(ZK_ADDR);
AssertJUnit.assertTrue(_zkServer != null);
- ZKClientPool.reset();
- _gZkClient = new ZkClient(ZK_ADDR);
- _gZkClient.setZkSerializer(new ZNRecordSerializer());
+ HelixZkClient.ZkClientConfig clientConfig = new HelixZkClient.ZkClientConfig();
+ clientConfig.setZkSerializer(new ZNRecordSerializer());
+ _gZkClient = DedicatedZkClientFactory.getInstance()
+ .buildZkClient(new HelixZkClient.ZkConnectionConfig(ZK_ADDR), clientConfig);
_gSetupTool = new ClusterSetup(_gZkClient);
_baseAccessor = new ZkBaseDataAccessor<>(_gZkClient);
}
@AfterSuite
public void afterSuite() {
- ZKClientPool.reset();
_gZkClient.close();
TestHelper.stopZkServer(_zkServer);
}
@@ -149,7 +151,7 @@ public class ZkTestBase {
return this.getClass().getSimpleName();
}
- protected String getCurrentLeader(ZkClient zkClient, String clusterName) {
+ protected String getCurrentLeader(HelixZkClient zkClient, String clusterName) {
ZKHelixDataAccessor accessor =
new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(zkClient));
Builder keyBuilder = accessor.keyBuilder();
@@ -161,7 +163,7 @@ public class ZkTestBase {
return leader.getInstanceName();
}
- protected void stopCurrentLeader(ZkClient zkClient, String clusterName,
+ protected void stopCurrentLeader(HelixZkClient zkClient, String clusterName,
Map<String, Thread> threadMap, Map<String, HelixManager> managerMap) {
String leader = getCurrentLeader(zkClient, clusterName);
Assert.assertTrue(leader != null);
@@ -202,7 +204,7 @@ public class ZkTestBase {
new ConfigAccessor(_gZkClient).set(scope, "healthChange" + ".enabled", "" + true);
}
- protected void enablePersistBestPossibleAssignment(ZkClient zkClient, String clusterName,
+ protected void enablePersistBestPossibleAssignment(HelixZkClient zkClient, String clusterName,
Boolean enabled) {
ConfigAccessor configAccessor = new ConfigAccessor(zkClient);
ClusterConfig clusterConfig = configAccessor.getClusterConfig(clusterName);
@@ -210,7 +212,7 @@ public class ZkTestBase {
configAccessor.setClusterConfig(clusterName, clusterConfig);
}
- protected void enablePersistIntermediateAssignment(ZkClient zkClient, String clusterName,
+ protected void enablePersistIntermediateAssignment(HelixZkClient zkClient, String clusterName,
Boolean enabled) {
ConfigAccessor configAccessor = new ConfigAccessor(zkClient);
ClusterConfig clusterConfig = configAccessor.getClusterConfig(clusterName);
@@ -218,7 +220,7 @@ public class ZkTestBase {
configAccessor.setClusterConfig(clusterName, clusterConfig);
}
- protected void enableTopologyAwareRebalance(ZkClient zkClient, String clusterName,
+ protected void enableTopologyAwareRebalance(HelixZkClient zkClient, String clusterName,
Boolean enabled) {
ConfigAccessor configAccessor = new ConfigAccessor(zkClient);
ClusterConfig clusterConfig = configAccessor.getClusterConfig(clusterName);
@@ -226,7 +228,7 @@ public class ZkTestBase {
configAccessor.setClusterConfig(clusterName, clusterConfig);
}
- protected void enableDelayRebalanceInCluster(ZkClient zkClient, String clusterName,
+ protected void enableDelayRebalanceInCluster(HelixZkClient zkClient, String clusterName,
boolean enabled) {
ConfigAccessor configAccessor = new ConfigAccessor(zkClient);
ClusterConfig clusterConfig = configAccessor.getClusterConfig(clusterName);
@@ -234,7 +236,7 @@ public class ZkTestBase {
configAccessor.setClusterConfig(clusterName, clusterConfig);
}
- protected void enableDelayRebalanceInInstance(ZkClient zkClient, String clusterName,
+ protected void enableDelayRebalanceInInstance(HelixZkClient zkClient, String clusterName,
String instanceName, boolean enabled) {
ConfigAccessor configAccessor = new ConfigAccessor(zkClient);
InstanceConfig instanceConfig = configAccessor.getInstanceConfig(clusterName, instanceName);
@@ -274,7 +276,7 @@ public class ZkTestBase {
}
}
- protected void setDelayTimeInCluster(ZkClient zkClient, String clusterName, long delay) {
+ protected void setDelayTimeInCluster(HelixZkClient zkClient, String clusterName, long delay) {
ConfigAccessor configAccessor = new ConfigAccessor(zkClient);
ClusterConfig clusterConfig = configAccessor.getClusterConfig(clusterName);
clusterConfig.setRebalanceDelayTime(delay);
@@ -412,7 +414,7 @@ public class ZkTestBase {
stage.postProcess();
}
- public void verifyInstance(ZkClient zkClient, String clusterName, String instance,
+ public void verifyInstance(HelixZkClient zkClient, String clusterName, String instance,
boolean wantExists) {
// String instanceConfigsPath = HelixUtil.getConfigPath(clusterName);
String instanceConfigsPath = PropertyPathBuilder.instanceConfig(clusterName);
@@ -422,13 +424,13 @@ public class ZkTestBase {
AssertJUnit.assertEquals(wantExists, zkClient.exists(instancePath));
}
- public void verifyResource(ZkClient zkClient, String clusterName, String resource,
+ public void verifyResource(HelixZkClient zkClient, String clusterName, String resource,
boolean wantExists) {
String resourcePath = PropertyPathBuilder.idealState(clusterName, resource);
AssertJUnit.assertEquals(wantExists, zkClient.exists(resourcePath));
}
- public void verifyEnabled(ZkClient zkClient, String clusterName, String instance,
+ public void verifyEnabled(HelixZkClient zkClient, String clusterName, String instance,
boolean wantEnabled) {
ZKHelixDataAccessor accessor =
new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(zkClient));
@@ -438,7 +440,7 @@ public class ZkTestBase {
AssertJUnit.assertEquals(wantEnabled, config.getInstanceEnabled());
}
- public void verifyReplication(ZkClient zkClient, String clusterName, String resource, int repl) {
+ public void verifyReplication(HelixZkClient zkClient, String clusterName, String resource, int repl) {
ZKHelixDataAccessor accessor =
new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(zkClient));
Builder keyBuilder = accessor.keyBuilder();
@@ -476,8 +478,10 @@ public class ZkTestBase {
LOG.info("After session expiry sessionId = " + oldZookeeper.getSessionId());
}
- protected void simulateSessionExpiry(ZkClient zkClient)
+ protected void simulateSessionExpiry(HelixZkClient client)
throws IOException, InterruptedException, IOException {
+ ZkClient zkClient = (ZkClient) client;
+
IZkStateListener listener = new IZkStateListener() {
@Override
public void handleStateChanged(Watcher.Event.KeeperState state) throws Exception {
@@ -667,17 +671,21 @@ public class ZkTestBase {
protected static class EmptyZkVerifier implements ClusterStateVerifier.ZkVerifier {
private final String _clusterName;
private final String _resourceName;
- private final ZkClient _zkClient;
+ private final HelixZkClient _zkClient;
/**
* Instantiate the verifier
- * @param clusterName the cluster to verify
+ *
+ * @param clusterName the cluster to verify
* @param resourceName the resource to verify
*/
public EmptyZkVerifier(String clusterName, String resourceName) {
_clusterName = clusterName;
_resourceName = resourceName;
- _zkClient = ZKClientPool.getZkClient(ZK_ADDR);
+
+ _zkClient = DedicatedZkClientFactory.getInstance()
+ .buildZkClient(new HelixZkClient.ZkConnectionConfig(ZK_ADDR));
+ _zkClient.setZkSerializer(new ZNRecordSerializer());
}
@Override
@@ -717,7 +725,7 @@ public class ZkTestBase {
@Override
public ZkClient getZkClient() {
- return _zkClient;
+ return (ZkClient) _gZkClient;
}
@Override
http://git-wip-us.apache.org/repos/asf/helix/blob/9d7364d7/helix-core/src/test/java/org/apache/helix/integration/TestCorrectnessOnConnectivityLoss.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestCorrectnessOnConnectivityLoss.java b/helix-core/src/test/java/org/apache/helix/integration/TestCorrectnessOnConnectivityLoss.java
index 3554207..a78e7bf 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestCorrectnessOnConnectivityLoss.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestCorrectnessOnConnectivityLoss.java
@@ -20,6 +20,8 @@ package org.apache.helix.integration;
*/
import com.google.common.collect.Maps;
+
+import java.lang.reflect.Method;
import java.util.Map;
import org.I0Itec.zkclient.ZkServer;
import org.apache.helix.HelixManager;
@@ -50,11 +52,11 @@ public class TestCorrectnessOnConnectivityLoss {
private ClusterControllerManager _controller;
@BeforeMethod
- public void beforeMethod() throws Exception {
- _zkServer = TestHelper.startZkServer(ZK_ADDR);
+ public void beforeMethod(Method testMethod) throws Exception {
+ _zkServer = TestHelper.startZkServer(ZK_ADDR, null, false);
String className = TestHelper.getTestClassName();
- String methodName = TestHelper.getTestMethodName();
+ String methodName = testMethod.getName();
_clusterName = className + "_" + methodName;
TestHelper.setupCluster(_clusterName, ZK_ADDR, 12918, // participant start port
"localhost", // participant host
@@ -71,6 +73,11 @@ public class TestCorrectnessOnConnectivityLoss {
_controller.connect();
}
+ @AfterMethod
+ public void afterMethod() {
+ TestHelper.stopZkServer(_zkServer);
+ }
+
@Test
public void testParticipant() throws Exception {
Map<String, Integer> stateReachedCounts = Maps.newHashMap();
@@ -136,11 +143,6 @@ public class TestCorrectnessOnConnectivityLoss {
}
}
- @AfterMethod
- public void afterMethod() throws Exception {
- TestHelper.stopZkServer(_zkServer);
- }
-
@StateModelInfo(initialState = "OFFLINE", states = {
"MASTER", "SLAVE", "OFFLINE", "ERROR"
})