You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2018/10/30 00:40:33 UTC
[1/2] helix git commit: Introduce Helix ZkClient factory. And use the
factory to generate new ZkClient in the critical Helix components.
Repository: helix
Updated Branches:
refs/heads/master 281f5d1ec -> 7bb55742e
http://git-wip-us.apache.org/repos/asf/helix/blob/7bb55742/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java b/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
index b57ca87..9c18d09 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
@@ -57,6 +57,7 @@ import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
@@ -70,8 +71,8 @@ public class ZkClient implements Watcher {
private static Logger LOG = LoggerFactory.getLogger(ZkClient.class);
private static long MAX_RECONNECT_INTERVAL_MS = 30000; // 30 seconds
- protected final IZkConnection _connection;
- protected final long _operationRetryTimeoutInMillis;
+ private final IZkConnection _connection;
+ private final long _operationRetryTimeoutInMillis;
private final Map<String, Set<IZkChildListener>> _childListener =
new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, Set<IZkDataListenerEntry>> _dataListener =
@@ -87,7 +88,6 @@ public class ZkClient implements Watcher {
private PathBasedZkSerializer _pathBasedZkSerializer;
private ZkClientMonitor _monitor;
-
private class IZkDataListenerEntry {
final IZkDataListener _dataListener;
final boolean _prefetchData;
@@ -130,7 +130,6 @@ public class ZkClient implements Watcher {
}
}
-
protected ZkClient(IZkConnection zkConnection, int connectionTimeout, long operationRetryTimeout,
PathBasedZkSerializer zkSerializer, String monitorType, String monitorKey,
String monitorInstanceName, boolean monitorRootPathOnly) {
@@ -140,6 +139,7 @@ public class ZkClient implements Watcher {
_connection = zkConnection;
_pathBasedZkSerializer = zkSerializer;
_operationRetryTimeoutInMillis = operationRetryTimeout;
+
connect(connectionTimeout, this);
// initiate monitor
@@ -510,7 +510,7 @@ public class ZkClient implements Watcher {
String actualPath = retryUntilConnected(new Callable<String>() {
@Override
public String call() throws Exception {
- return _connection.create(path, data, acl, mode);
+ return getConnection().create(path, data, acl, mode);
}
});
record(path, data, startT, ZkClientMonitor.AccessType.WRITE);
@@ -695,7 +695,7 @@ public class ZkClient implements Watcher {
List<String> children = retryUntilConnected(new Callable<List<String>>() {
@Override
public List<String> call() throws Exception {
- return _connection.getChildren(path, watch);
+ return getConnection().getChildren(path, watch);
}
});
record(path, null, startT, ZkClientMonitor.AccessType.READ);
@@ -737,7 +737,7 @@ public class ZkClient implements Watcher {
boolean exists = retryUntilConnected(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
- return _connection.exists(path, watch);
+ return getConnection().exists(path, watch);
}
});
record(path, null, startT, ZkClientMonitor.AccessType.READ);
@@ -759,7 +759,7 @@ public class ZkClient implements Watcher {
Stat stat = retryUntilConnected(new Callable<Stat>() {
@Override
public Stat call() throws Exception {
- Stat stat = ((ZkConnection) _connection).getZookeeper().exists(path, false);
+ Stat stat = ((ZkConnection) getConnection()).getZookeeper().exists(path, false);
return stat;
}
});
@@ -776,14 +776,14 @@ public class ZkClient implements Watcher {
}
}
- private void processStateChanged(WatchedEvent event) {
+ protected void processStateChanged(WatchedEvent event) {
LOG.info("zookeeper state changed (" + event.getState() + ")");
setCurrentState(event.getState());
if (getShutdownTrigger()) {
return;
}
fireStateChangedEvent(event.getState());
- if (event.getState() == KeeperState.Expired) {
+ if (isManagingZkConnection() && event.getState() == KeeperState.Expired) {
reconnectOnExpiring();
}
}
@@ -794,7 +794,7 @@ public class ZkClient implements Watcher {
new ExponentialBackoffStrategy(MAX_RECONNECT_INTERVAL_MS, true);
Exception reconnectException = new ZkException("Shutdown triggered.");
- while (!_closed) {
+ while (!isClosed()) {
try {
reconnect();
fireNewSessionEvents();
@@ -820,6 +820,19 @@ public class ZkClient implements Watcher {
fireSessionEstablishmentError(reconnectException);
}
+ private void reconnect() {
+ getEventLock().lock();
+ try {
+ IZkConnection connection = getConnection();
+ connection.close();
+ connection.connect(this);
+ } catch (InterruptedException e) {
+ throw new ZkInterruptedException(e);
+ } finally {
+ getEventLock().unlock();
+ }
+ }
+
private void fireNewSessionEvents() {
for (final IZkStateListener stateListener : _stateListener) {
_eventThread.send(new ZkEvent("New session event sent to " + stateListener) {
@@ -831,7 +844,7 @@ public class ZkClient implements Watcher {
}
}
- private void fireStateChangedEvent(final KeeperState state) {
+ protected void fireStateChangedEvent(final KeeperState state) {
for (final IZkStateListener stateListener : _stateListener) {
_eventThread.send(new ZkEvent("State changed to " + state + " sent to " + stateListener) {
@@ -1073,7 +1086,7 @@ public class ZkClient implements Watcher {
}
final long operationStartTime = System.currentTimeMillis();
while (true) {
- if (_closed) {
+ if (isClosed()) {
throw new IllegalStateException("ZkClient already closed!");
}
try {
@@ -1147,7 +1160,7 @@ public class ZkClient implements Watcher {
@Override
public Object call() throws Exception {
- _connection.delete(path);
+ getConnection().delete(path);
return null;
}
});
@@ -1226,7 +1239,7 @@ public class ZkClient implements Watcher {
data = retryUntilConnected(new Callable<byte[]>() {
@Override public byte[] call() throws Exception {
- return _connection.readData(path, stat, watch);
+ return getConnection().readData(path, stat, watch);
}
});
record(path, data, startT, ZkClientMonitor.AccessType.READ);
@@ -1299,7 +1312,7 @@ public class ZkClient implements Watcher {
checkDataSizeLimit(data);
final Stat stat = (Stat) retryUntilConnected(new Callable<Object>() {
@Override public Object call() throws Exception {
- return _connection.writeDataReturnStat(path, data, expectedVersion);
+ return getConnection().writeDataReturnStat(path, data, expectedVersion);
}
});
record(path, data, startT, ZkClientMonitor.AccessType.WRITE);
@@ -1326,7 +1339,7 @@ public class ZkClient implements Watcher {
final byte[] data = (datat == null ? null : serialize(datat, path));
retryUntilConnected(new Callable<Object>() {
@Override public Object call() throws Exception {
- ((ZkConnection) _connection).getZookeeper().create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE,
+ ((ZkConnection) getConnection()).getZookeeper().create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE,
// Arrays.asList(DEFAULT_ACL),
mode, cb, new ZkAsyncCallbacks.ZkAsyncCallContext(_monitor, startT,
data == null ? 0 : data.length, false));
@@ -1342,7 +1355,7 @@ public class ZkClient implements Watcher {
final byte[] data = serialize(datat, path);
retryUntilConnected(new Callable<Object>() {
@Override public Object call() throws Exception {
- ((ZkConnection) _connection).getZookeeper().setData(path, data, version, cb,
+ ((ZkConnection) getConnection()).getZookeeper().setData(path, data, version, cb,
new ZkAsyncCallbacks.ZkAsyncCallContext(_monitor, startT,
data == null ? 0 : data.length, false));
return null;
@@ -1354,7 +1367,7 @@ public class ZkClient implements Watcher {
final long startT = System.currentTimeMillis();
retryUntilConnected(new Callable<Object>() {
@Override public Object call() throws Exception {
- ((ZkConnection) _connection).getZookeeper().getData(path, null, cb,
+ ((ZkConnection) getConnection()).getZookeeper().getData(path, null, cb,
new ZkAsyncCallbacks.ZkAsyncCallContext(_monitor, startT, 0, true));
return null;
}
@@ -1365,7 +1378,7 @@ public class ZkClient implements Watcher {
final long startT = System.currentTimeMillis();
retryUntilConnected(new Callable<Object>() {
@Override public Object call() throws Exception {
- ((ZkConnection) _connection).getZookeeper().exists(path, null, cb,
+ ((ZkConnection) getConnection()).getZookeeper().exists(path, null, cb,
new ZkAsyncCallbacks.ZkAsyncCallContext(_monitor, startT, 0, true));
return null;
}
@@ -1376,7 +1389,7 @@ public class ZkClient implements Watcher {
final long startT = System.currentTimeMillis();
retryUntilConnected(new Callable<Object>() {
@Override public Object call() throws Exception {
- ((ZkConnection) _connection).getZookeeper().delete(path, -1, cb,
+ ((ZkConnection) getConnection()).getZookeeper().delete(path, -1, cb,
new ZkAsyncCallbacks.ZkAsyncCallContext(_monitor, startT, 0, false));
return null;
}
@@ -1394,7 +1407,7 @@ public class ZkClient implements Watcher {
public void watchForData(final String path) {
retryUntilConnected(new Callable<Object>() {
@Override public Object call() throws Exception {
- _connection.exists(path, true);
+ getConnection().exists(path, true);
return null;
}
});
@@ -1433,7 +1446,7 @@ public class ZkClient implements Watcher {
public void addAuthInfo(final String scheme, final byte[] auth) {
retryUntilConnected(new Callable<Object>() {
@Override public Object call() throws Exception {
- _connection.addAuthInfo(scheme, auth);
+ getConnection().addAuthInfo(scheme, auth);
return null;
}
});
@@ -1453,22 +1466,34 @@ public class ZkClient implements Watcher {
*/
public void connect(final long maxMsToWaitUntilConnected, Watcher watcher)
throws ZkInterruptedException, ZkTimeoutException, IllegalStateException {
- if (_closed) {
+ if (isClosed()) {
throw new IllegalStateException("ZkClient already closed!");
}
boolean started = false;
acquireEventLock();
try {
setShutdownTrigger(false);
- _eventThread = new ZkEventThread(_connection.getServers());
+
+ IZkConnection zkConnection = getConnection();
+ _eventThread = new ZkEventThread(zkConnection.getServers());
_eventThread.start();
- _connection.connect(watcher);
- LOG.debug("Awaiting connection to Zookeeper server");
- if (!waitUntilConnected(maxMsToWaitUntilConnected, TimeUnit.MILLISECONDS)) {
- throw new ZkTimeoutException(
- "Unable to connect to zookeeper server within timeout: " + maxMsToWaitUntilConnected);
+ if (isManagingZkConnection()) {
+ zkConnection.connect(watcher);
+ LOG.debug("Awaiting connection to Zookeeper server");
+ if (!waitUntilConnected(maxMsToWaitUntilConnected, TimeUnit.MILLISECONDS)) {
+ throw new ZkTimeoutException(
+ "Unable to connect to zookeeper server within timeout: " + maxMsToWaitUntilConnected);
+ }
+ } else {
+ // if the client is not managing connection, the input connection is supposed to connect.
+ if (isConnectionClosed()) {
+ throw new HelixException(
+ "Unable to connect to zookeeper server with the specified ZkConnection");
+ }
+ setCurrentState(KeeperState.SyncConnected);
}
+
started = true;
} finally {
getEventLock().unlock();
@@ -1484,7 +1509,7 @@ public class ZkClient implements Watcher {
public long getCreationTime(String path) {
acquireEventLock();
try {
- return _connection.getCreateTime(path);
+ return getConnection().getCreateTime(path);
} catch (KeeperException e) {
throw ZkException.create(e);
} catch (InterruptedException e) {
@@ -1495,7 +1520,7 @@ public class ZkClient implements Watcher {
}
public String getServers() {
- return _connection.getServers();
+ return getConnection().getServers();
}
/**
@@ -1509,15 +1534,18 @@ public class ZkClient implements Watcher {
LOG.trace("closing a zkclient. callStack: " + Arrays.asList(calls));
}
getEventLock().lock();
+ IZkConnection connection = getConnection();
try {
- if (_connection == null || _closed) {
+ if (connection == null || _closed) {
return;
}
- LOG.info("Closing zkclient: " + ((ZkConnection) _connection).getZookeeper());
setShutdownTrigger(true);
_eventThread.interrupt();
_eventThread.join(2000);
- _connection.close();
+ if (isManagingZkConnection()) {
+ LOG.info("Closing zkclient: " + ((ZkConnection) connection).getZookeeper());
+ connection.close();
+ }
_closed = true;
// send state change notification to unlock any wait
@@ -1529,7 +1557,7 @@ public class ZkClient implements Watcher {
* Workaround for HELIX-264: calling ZkClient#close() in its own eventThread context will
* throw ZkInterruptedException and skip ZkConnection#close()
*/
- if (_connection != null) {
+ if (connection != null) {
try {
/**
* ZkInterruptedException#construct() honors InterruptedException by calling
@@ -1537,7 +1565,9 @@ public class ZkClient implements Watcher {
* zk-connection
*/
Thread.interrupted();
- _connection.close();
+ if (isManagingZkConnection()) {
+ connection.close();
+ }
/**
* restore interrupted status of current thread
*/
@@ -1556,26 +1586,20 @@ public class ZkClient implements Watcher {
}
public boolean isClosed() {
- return _closed;
- }
-
- public boolean isConnectionClosed() {
- return (_connection == null || _connection.getZookeeperState() == null ||
- !_connection.getZookeeperState().isAlive());
- }
-
- private void reconnect() {
- getEventLock().lock();
try {
- _connection.close();
- _connection.connect(this);
- } catch (InterruptedException e) {
- throw new ZkInterruptedException(e);
+ getEventLock().lock();
+ return _closed;
} finally {
getEventLock().unlock();
}
}
+ public boolean isConnectionClosed() {
+ IZkConnection connection = getConnection();
+ return (connection == null || connection.getZookeeperState() == null ||
+ !connection.getZookeeperState().isAlive());
+ }
+
public void setShutdownTrigger(boolean triggerState) {
_shutdownTriggered = triggerState;
}
@@ -1605,11 +1629,29 @@ public class ZkClient implements Watcher {
return retryUntilConnected(new Callable<List<OpResult>>() {
@Override public List<OpResult> call() throws Exception {
- return _connection.multi(ops);
+ return getConnection().multi(ops);
}
});
}
+ /**
+ * @return true if this ZkClient is managing the ZkConnection.
+ */
+ protected boolean isManagingZkConnection() {
+ return true;
+ }
+
+ public long getSessionId() {
+ ZkConnection zkConnection = ((ZkConnection) getConnection());
+ ZooKeeper zk = zkConnection.getZookeeper();
+ if (zk == null) {
+ throw new HelixException(
+ "ZooKeeper connection information is not available now. ZkClient might be disconnected.");
+ } else {
+ return zkConnection.getZookeeper().getSessionId();
+ }
+ }
+
// operations to update monitor's counters
private void record(String path, byte[] data, long startTimeMilliSec, ZkClientMonitor.AccessType accessType) {
if (_monitor != null) {
http://git-wip-us.apache.org/repos/asf/helix/blob/7bb55742/helix-core/src/main/java/org/apache/helix/participant/HelixCustomCodeRunner.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/participant/HelixCustomCodeRunner.java b/helix-core/src/main/java/org/apache/helix/participant/HelixCustomCodeRunner.java
index d458c52..794e9e1 100644
--- a/helix-core/src/main/java/org/apache/helix/participant/HelixCustomCodeRunner.java
+++ b/helix-core/src/main/java/org/apache/helix/participant/HelixCustomCodeRunner.java
@@ -31,7 +31,8 @@ import org.apache.helix.ZNRecord;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.manager.zk.ZNRecordSerializer;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.manager.zk.client.HelixZkClient;
+import org.apache.helix.manager.zk.client.SharedZkClientFactory;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.IdealState.RebalanceMode;
import org.slf4j.Logger;
@@ -129,14 +130,15 @@ public class HelixCustomCodeRunner {
StateMachineEngine stateMach = _manager.getStateMachineEngine();
stateMach.registerStateModelFactory(LEADER_STANDBY, _stateModelFty, _resourceName);
- ZkClient zkClient = null;
+ HelixZkClient zkClient = null;
try {
// manually add ideal state for participant leader using LeaderStandby
// model
+ HelixZkClient.ZkClientConfig clientConfig = new HelixZkClient.ZkClientConfig();
+ clientConfig.setZkSerializer(new ZNRecordSerializer());
+ zkClient = SharedZkClientFactory
+ .getInstance().buildZkClient(new HelixZkClient.ZkConnectionConfig(_zkAddr), clientConfig);
- zkClient =
- new ZkClient(_zkAddr, ZkClient.DEFAULT_SESSION_TIMEOUT,
- ZkClient.DEFAULT_CONNECTION_TIMEOUT, new ZNRecordSerializer());
HelixDataAccessor accessor =
new ZKHelixDataAccessor(_manager.getClusterName(), new ZkBaseDataAccessor<ZNRecord>(
zkClient));
@@ -161,9 +163,7 @@ public class HelixCustomCodeRunner {
LOG.info("Set idealState for participantLeader:" + _resourceName + ", idealState:"
+ idealState);
} finally {
- if (zkClient != null && zkClient.getConnection() != null)
-
- {
+ if (zkClient != null && !zkClient.isClosed()) {
zkClient.close();
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/7bb55742/helix-core/src/main/java/org/apache/helix/tools/MessagePoster.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/MessagePoster.java b/helix-core/src/main/java/org/apache/helix/tools/MessagePoster.java
index 80a7820..b1d6582 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/MessagePoster.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/MessagePoster.java
@@ -24,15 +24,17 @@ import java.util.UUID;
import org.apache.helix.PropertyPathBuilder;
import org.apache.helix.ZNRecord;
import org.apache.helix.manager.zk.ZNRecordSerializer;
-import org.apache.helix.manager.zk.ZkClient;
-import org.apache.helix.model.Message;
+import org.apache.helix.manager.zk.client.HelixZkClient;
+import org.apache.helix.manager.zk.client.SharedZkClientFactory;
import org.apache.helix.model.LiveInstance.LiveInstanceProperty;
+import org.apache.helix.model.Message;
import org.apache.helix.model.Message.MessageState;
import org.apache.helix.model.Message.MessageType;
public class MessagePoster {
public void post(String zkServer, Message message, String clusterName, String instanceName) {
- ZkClient client = new ZkClient(zkServer);
+ HelixZkClient client = SharedZkClientFactory.getInstance().buildZkClient(new HelixZkClient.ZkConnectionConfig(
+ zkServer));
client.setZkSerializer(new ZNRecordSerializer());
String path = PropertyPathBuilder.instanceMessage(clusterName, instanceName, message.getId());
client.delete(path);
http://git-wip-us.apache.org/repos/asf/helix/blob/7bb55742/helix-core/src/main/java/org/apache/helix/tools/TestExecutor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/TestExecutor.java b/helix-core/src/main/java/org/apache/helix/tools/TestExecutor.java
index dd6f3a9..908bba5 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/TestExecutor.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/TestExecutor.java
@@ -35,13 +35,15 @@ import org.apache.helix.HelixManager;
import org.apache.helix.ZNRecord;
import org.apache.helix.manager.zk.ZNRecordSerializer;
import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.manager.zk.client.HelixZkClient;
+import org.apache.helix.manager.zk.client.SharedZkClientFactory;
import org.apache.helix.store.PropertyJsonComparator;
import org.apache.helix.store.PropertyJsonSerializer;
import org.apache.helix.store.PropertyStoreException;
import org.apache.helix.tools.TestCommand.CommandType;
+import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.zookeeper.data.Stat;
/**
* a test is structured logically as a list of commands a command has three parts: COMMAND
@@ -747,10 +749,11 @@ public class TestExecutor {
String zkAddr, CountDownLatch countDown) {
final Map<TestCommand, Boolean> testResults = new ConcurrentHashMap<TestCommand, Boolean>();
- ZkClient zkClient = null;
- zkClient = new ZkClient(zkAddr, ZkClient.DEFAULT_CONNECTION_TIMEOUT);
- zkClient.setZkSerializer(new ZNRecordSerializer());
+ HelixZkClient.ZkClientConfig clientConfig = new HelixZkClient.ZkClientConfig();
+ clientConfig.setZkSerializer(new ZNRecordSerializer());
+ HelixZkClient zkClient = SharedZkClientFactory
+ .getInstance().buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddr), clientConfig);
// sort on trigger's start time, stable sort
Collections.sort(commandList, new Comparator<TestCommand>() {
@@ -765,7 +768,7 @@ public class TestExecutor {
TestTrigger trigger = command._trigger;
command._startTimestamp = System.currentTimeMillis() + trigger._startTime;
- new Thread(new ExecuteCommand(command._startTimestamp, command, countDown, zkClient,
+ new Thread(new ExecuteCommand(command._startTimestamp, command, countDown, (ZkClient) zkClient,
testResults)).start();
}
http://git-wip-us.apache.org/repos/asf/helix/blob/7bb55742/helix-core/src/main/java/org/apache/helix/tools/commandtools/ZKDumper.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/commandtools/ZKDumper.java b/helix-core/src/main/java/org/apache/helix/tools/commandtools/ZKDumper.java
index c171b73..cf7f22e 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/commandtools/ZKDumper.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/commandtools/ZKDumper.java
@@ -38,14 +38,15 @@ import org.apache.commons.cli.OptionGroup;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
import org.apache.helix.manager.zk.ByteArraySerializer;
-import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.manager.zk.client.HelixZkClient;
+import org.apache.helix.manager.zk.client.SharedZkClientFactory;
/**
* Dumps the Zookeeper file structure on to Disk
*/
@SuppressWarnings("static-access")
public class ZKDumper {
- private ZkClient client;
+ private HelixZkClient client;
private FilenameFilter filter;
static Options options;
private String suffix = "";
@@ -110,7 +111,8 @@ public class ZKDumper {
}
public ZKDumper(String zkAddress) {
- client = new ZkClient(zkAddress, ZkClient.DEFAULT_CONNECTION_TIMEOUT);
+ client = SharedZkClientFactory.getInstance()
+ .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddress));
ZkSerializer zkSerializer = new ByteArraySerializer();
client.setZkSerializer(zkSerializer);
http://git-wip-us.apache.org/repos/asf/helix/blob/7bb55742/helix-core/src/main/java/org/apache/helix/tools/commandtools/ZkCopy.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/commandtools/ZkCopy.java b/helix-core/src/main/java/org/apache/helix/tools/commandtools/ZkCopy.java
index 5bd955a..805847c 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/commandtools/ZkCopy.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/commandtools/ZkCopy.java
@@ -37,11 +37,12 @@ import org.apache.helix.AccessOption;
import org.apache.helix.BaseDataAccessor;
import org.apache.helix.manager.zk.ByteArraySerializer;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.manager.zk.ZkClient;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.helix.manager.zk.client.HelixZkClient;
+import org.apache.helix.manager.zk.client.SharedZkClientFactory;
import org.apache.zookeeper.common.PathUtils;
import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Tool for copying a zk/file path to another zk/file path
@@ -99,7 +100,7 @@ public class ZkCopy {
* @param dstRootPath
* @param paths
*/
- private static void copy(ZkClient srcClient, String srcRootPath, ZkClient dstClient,
+ private static void copy(HelixZkClient srcClient, String srcRootPath, HelixZkClient dstClient,
String dstRootPath, List<String> paths) {
BaseDataAccessor<Object> srcAccessor = new ZkBaseDataAccessor<Object>(srcClient);
List<String> readPaths = new ArrayList<String>();
@@ -146,7 +147,8 @@ public class ZkCopy {
}
}
- private static void zkCopy(ZkClient srcClient, String srcRootPath, ZkClient dstClient, String dstRootPath) {
+ private static void zkCopy(HelixZkClient srcClient, String srcRootPath, HelixZkClient dstClient,
+ String dstRootPath) {
// Strip off tailing "/"
if (!srcRootPath.equals("/") && srcRootPath.endsWith("/")) {
srcRootPath = srcRootPath.substring(0, srcRootPath.length() - 1);
@@ -218,21 +220,21 @@ public class ZkCopy {
String srcZkAddr = srcUri.getAuthority();
String dstZkAddr = dstUri.getAuthority();
- ZkClient srcClient = null;
- ZkClient dstClient = null;
+ HelixZkClient.ZkClientConfig clientConfig = new HelixZkClient.ZkClientConfig();
+ HelixZkClient srcClient = null;
+ HelixZkClient dstClient = null;
try {
if (srcZkAddr.equals(dstZkAddr)) {
- srcClient =
- dstClient =
- new ZkClient(srcZkAddr, ZkClient.DEFAULT_SESSION_TIMEOUT,
- ZkClient.DEFAULT_CONNECTION_TIMEOUT, new ByteArraySerializer());
+ clientConfig.setZkSerializer(new ByteArraySerializer());
+ srcClient = dstClient = SharedZkClientFactory.getInstance()
+ .buildZkClient(new HelixZkClient.ZkConnectionConfig(srcZkAddr), clientConfig);
} else {
- srcClient =
- new ZkClient(srcZkAddr, ZkClient.DEFAULT_SESSION_TIMEOUT,
- ZkClient.DEFAULT_CONNECTION_TIMEOUT, new ByteArraySerializer());
- dstClient =
- new ZkClient(dstZkAddr, ZkClient.DEFAULT_SESSION_TIMEOUT,
- ZkClient.DEFAULT_CONNECTION_TIMEOUT, new ByteArraySerializer());
+ clientConfig.setZkSerializer(new ByteArraySerializer());
+ srcClient = SharedZkClientFactory.getInstance()
+ .buildZkClient(new HelixZkClient.ZkConnectionConfig(srcZkAddr), clientConfig);
+ clientConfig.setZkSerializer(new ByteArraySerializer());
+ dstClient = SharedZkClientFactory.getInstance()
+ .buildZkClient(new HelixZkClient.ZkConnectionConfig(dstZkAddr), clientConfig);
}
String srcPath = srcUri.getPath();
String dstPath = dstUri.getPath();
http://git-wip-us.apache.org/repos/asf/helix/blob/7bb55742/helix-core/src/test/java/org/apache/helix/integration/TestResourceGroupEndtoEnd.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestResourceGroupEndtoEnd.java b/helix-core/src/test/java/org/apache/helix/integration/TestResourceGroupEndtoEnd.java
index d3f447a..63d87eb 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestResourceGroupEndtoEnd.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestResourceGroupEndtoEnd.java
@@ -427,7 +427,7 @@ public class TestResourceGroupEndtoEnd extends ZkTestBase {
@Override
public ZkClient getZkClient() {
- return _zkclient;
+ return (ZkClient) _zkclient;
}
@Override
http://git-wip-us.apache.org/repos/asf/helix/blob/7bb55742/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterControllerManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterControllerManager.java b/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterControllerManager.java
index ffa2cb2..96f4a88 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterControllerManager.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterControllerManager.java
@@ -89,7 +89,7 @@ public class ClusterControllerManager extends ZKHelixManager implements Runnable
@Override
public ZkClient getZkClient() {
- return _zkclient;
+ return (ZkClient) _zkclient;
}
@Override
http://git-wip-us.apache.org/repos/asf/helix/blob/7bb55742/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterDistributedController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterDistributedController.java b/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterDistributedController.java
index 1cce08d..b186a1a 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterDistributedController.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/manager/ClusterDistributedController.java
@@ -83,7 +83,7 @@ public class ClusterDistributedController extends ZKHelixManager implements Runn
@Override
public ZkClient getZkClient() {
- return _zkclient;
+ return (ZkClient) _zkclient;
}
@Override
http://git-wip-us.apache.org/repos/asf/helix/blob/7bb55742/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java b/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java
index 2bd2630..362709a 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java
@@ -128,7 +128,7 @@ public class MockParticipantManager extends ZKHelixManager implements Runnable,
@Override
public ZkClient getZkClient() {
- return _zkclient;
+ return (ZkClient) _zkclient;
}
@Override
http://git-wip-us.apache.org/repos/asf/helix/blob/7bb55742/helix-core/src/test/java/org/apache/helix/manager/zk/TestRawZkClient.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestRawZkClient.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestRawZkClient.java
new file mode 100644
index 0000000..d0cf004
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestRawZkClient.java
@@ -0,0 +1,294 @@
+package org.apache.helix.manager.zk;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.lang.management.ManagementFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import org.I0Itec.zkclient.IZkDataListener;
+import org.I0Itec.zkclient.IZkStateListener;
+import org.I0Itec.zkclient.ZkConnection;
+import org.apache.helix.HelixException;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.monitoring.mbeans.MBeanRegistrar;
+import org.apache.helix.monitoring.mbeans.MonitorDomainNames;
+import org.apache.helix.monitoring.mbeans.ZkClientMonitor;
+import org.apache.helix.monitoring.mbeans.ZkClientPathMonitor;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.AssertJUnit;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class TestRawZkClient extends ZkUnitTestBase {
+ private static Logger LOG = LoggerFactory.getLogger(TestRawZkClient.class);
+
+ ZkClient _zkClient;
+
+ @BeforeClass
+ public void beforeClass() {
+ _zkClient = new ZkClient(ZK_ADDR);
+ _zkClient.setZkSerializer(new ZNRecordSerializer());
+ }
+
+ @AfterClass
+ public void afterClass() {
+ _zkClient.close();
+ }
+
+ @Test()
+ void testGetStat() {
+ String path = "/tmp/getStatTest";
+ _zkClient.deleteRecursively(path);
+
+ Stat stat, newStat;
+ stat = _zkClient.getStat(path);
+ AssertJUnit.assertNull(stat);
+ _zkClient.createPersistent(path, true);
+
+ stat = _zkClient.getStat(path);
+ AssertJUnit.assertNotNull(stat);
+
+ newStat = _zkClient.getStat(path);
+ AssertJUnit.assertEquals(stat, newStat);
+
+ _zkClient.writeData(path, new ZNRecord("Test"));
+ newStat = _zkClient.getStat(path);
+ AssertJUnit.assertNotSame(stat, newStat);
+ }
+
+ @Test()
+ void testSessionExpire() throws Exception {
+ IZkStateListener listener = new IZkStateListener() {
+
+ @Override
+ public void handleStateChanged(KeeperState state) throws Exception {
+ System.out.println("In Old connection New state " + state);
+ }
+
+ @Override
+ public void handleNewSession() throws Exception {
+ System.out.println("In Old connection New session");
+ }
+
+ @Override
+ public void handleSessionEstablishmentError(Throwable var1) throws Exception {
+ }
+ };
+
+ _zkClient.subscribeStateChanges(listener);
+ ZkConnection connection = ((ZkConnection) _zkClient.getConnection());
+ ZooKeeper zookeeper = connection.getZookeeper();
+ System.out.println("old sessionId= " + zookeeper.getSessionId());
+ Watcher watcher = new Watcher() {
+ @Override
+ public void process(WatchedEvent event) {
+ System.out.println("In New connection In process event:" + event);
+ }
+ };
+ ZooKeeper newZookeeper =
+ new ZooKeeper(connection.getServers(), zookeeper.getSessionTimeout(), watcher,
+ zookeeper.getSessionId(), zookeeper.getSessionPasswd());
+ Thread.sleep(3000);
+ System.out.println("New sessionId= " + newZookeeper.getSessionId());
+ Thread.sleep(3000);
+ newZookeeper.close();
+ Thread.sleep(10000);
+ connection = ((ZkConnection) _zkClient.getConnection());
+ zookeeper = connection.getZookeeper();
+ System.out.println("After session expiry sessionId= " + zookeeper.getSessionId());
+ }
+
+ @Test(expectedExceptions = HelixException.class, expectedExceptionsMessageRegExp = "Data size larger than 1M.*")
+ void testDataSizeLimit() {
+ ZNRecord data = new ZNRecord(new String(new char[1024 * 1024]));
+ _zkClient.writeData("/test", data, -1);
+ }
+
+ @Test
+ public void testZkClientMonitor() throws Exception {
+ final String TEST_TAG = "test_monitor";
+ final String TEST_KEY = "test_key";
+ final String TEST_DATA = "testData";
+ final String TEST_ROOT = "/my_cluster/IDEALSTATES";
+ final String TEST_NODE = "/test_zkclient_monitor";
+ final String TEST_PATH = TEST_ROOT + TEST_NODE;
+
+ ZkClient.Builder builder = new ZkClient.Builder();
+ builder.setZkServer(ZK_ADDR).setMonitorKey(TEST_KEY).setMonitorType(TEST_TAG)
+ .setMonitorRootPathOnly(false);
+ ZkClient zkClient = builder.build();
+
+ final long TEST_DATA_SIZE = zkClient.serialize(TEST_DATA, TEST_PATH).length;
+
+ if (_zkClient.exists(TEST_PATH)) {
+ _zkClient.delete(TEST_PATH);
+ }
+ if (!_zkClient.exists(TEST_ROOT)) {
+ _zkClient.createPersistent(TEST_ROOT, true);
+ }
+
+ MBeanServer beanServer = ManagementFactory.getPlatformMBeanServer();
+
+ ObjectName name = MBeanRegistrar
+ .buildObjectName(MonitorDomainNames.HelixZkClient.name(), ZkClientMonitor.MONITOR_TYPE,
+ TEST_TAG, ZkClientMonitor.MONITOR_KEY, TEST_KEY);
+ ObjectName rootname = MBeanRegistrar
+ .buildObjectName(MonitorDomainNames.HelixZkClient.name(), ZkClientMonitor.MONITOR_TYPE,
+ TEST_TAG, ZkClientMonitor.MONITOR_KEY, TEST_KEY, ZkClientPathMonitor.MONITOR_PATH,
+ "Root");
+ ObjectName idealStatename = MBeanRegistrar
+ .buildObjectName(MonitorDomainNames.HelixZkClient.name(), ZkClientMonitor.MONITOR_TYPE,
+ TEST_TAG, ZkClientMonitor.MONITOR_KEY, TEST_KEY, ZkClientPathMonitor.MONITOR_PATH,
+ "IdealStates");
+ Assert.assertTrue(beanServer.isRegistered(rootname));
+ Assert.assertTrue(beanServer.isRegistered(idealStatename));
+
+ // Test exists
+ Assert.assertEquals((long) beanServer.getAttribute(rootname, "ReadCounter"), 0);
+ Assert.assertEquals((long) beanServer.getAttribute(rootname, "ReadTotalLatencyCounter"), 0);
+ Assert.assertEquals((long) beanServer.getAttribute(rootname, "ReadLatencyGauge.Max"), 0);
+ zkClient.exists(TEST_ROOT);
+ Assert.assertEquals((long) beanServer.getAttribute(rootname, "ReadCounter"), 1);
+ Assert.assertTrue((long) beanServer.getAttribute(rootname, "ReadTotalLatencyCounter") >= 0);
+ Assert.assertTrue((long) beanServer.getAttribute(rootname, "ReadLatencyGauge.Max") >= 0);
+
+ // Test create
+ Assert.assertEquals((long) beanServer.getAttribute(rootname, "WriteCounter"), 0);
+ Assert.assertEquals((long) beanServer.getAttribute(rootname, "WriteBytesCounter"), 0);
+ Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "WriteCounter"), 0);
+ Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "WriteBytesCounter"), 0);
+ Assert.assertEquals((long) beanServer.getAttribute(rootname, "WriteTotalLatencyCounter"), 0);
+ Assert.assertEquals((long) beanServer.getAttribute(rootname, "WriteLatencyGauge.Max"), 0);
+ Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "WriteTotalLatencyCounter"),
+ 0);
+ Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "WriteLatencyGauge.Max"), 0);
+ zkClient.create(TEST_PATH, TEST_DATA, CreateMode.PERSISTENT);
+ Assert.assertEquals((long) beanServer.getAttribute(rootname, "WriteCounter"), 1);
+ Assert.assertEquals((long) beanServer.getAttribute(rootname, "WriteBytesCounter"),
+ TEST_DATA_SIZE);
+ Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "WriteCounter"), 1);
+ Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "WriteBytesCounter"),
+ TEST_DATA_SIZE);
+ long origWriteTotalLatencyCounter =
+ (long) beanServer.getAttribute(rootname, "WriteTotalLatencyCounter");
+ Assert.assertTrue(origWriteTotalLatencyCounter >= 0);
+ Assert.assertTrue((long) beanServer.getAttribute(rootname, "WriteLatencyGauge.Max") >= 0);
+ long origIdealStatesWriteTotalLatencyCounter =
+ (long) beanServer.getAttribute(idealStatename, "WriteTotalLatencyCounter");
+ Assert.assertTrue(origIdealStatesWriteTotalLatencyCounter >= 0);
+ Assert.assertTrue((long) beanServer.getAttribute(idealStatename, "WriteLatencyGauge.Max") >= 0);
+
+ // Test read
+ Assert.assertEquals((long) beanServer.getAttribute(rootname, "ReadCounter"), 1);
+ Assert.assertEquals((long) beanServer.getAttribute(rootname, "ReadBytesCounter"), 0);
+ Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "ReadCounter"), 0);
+ Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "ReadBytesCounter"), 0);
+ long origReadTotalLatencyCounter =
+ (long) beanServer.getAttribute(rootname, "ReadTotalLatencyCounter");
+ long origIdealStatesReadTotalLatencyCounter =
+ (long) beanServer.getAttribute(idealStatename, "ReadTotalLatencyCounter");
+ Assert.assertEquals(origIdealStatesReadTotalLatencyCounter, 0);
+ Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "ReadLatencyGauge.Max"), 0);
+ zkClient.readData(TEST_PATH, new Stat());
+ Assert.assertEquals((long) beanServer.getAttribute(rootname, "ReadCounter"), 2);
+ Assert
+ .assertEquals((long) beanServer.getAttribute(rootname, "ReadBytesCounter"), TEST_DATA_SIZE);
+ Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "ReadCounter"), 1);
+ Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "ReadBytesCounter"),
+ TEST_DATA_SIZE);
+ Assert.assertTrue((long) beanServer.getAttribute(rootname, "ReadTotalLatencyCounter")
+ >= origReadTotalLatencyCounter);
+ Assert.assertTrue((long) beanServer.getAttribute(idealStatename, "ReadTotalLatencyCounter")
+ >= origIdealStatesReadTotalLatencyCounter);
+ Assert.assertTrue((long) beanServer.getAttribute(idealStatename, "ReadLatencyGauge.Max") >= 0);
+ zkClient.getChildren(TEST_PATH);
+ Assert.assertEquals((long) beanServer.getAttribute(rootname, "ReadCounter"), 3);
+ Assert
+ .assertEquals((long) beanServer.getAttribute(rootname, "ReadBytesCounter"), TEST_DATA_SIZE);
+ Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "ReadCounter"), 2);
+ Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "ReadBytesCounter"),
+ TEST_DATA_SIZE);
+ zkClient.getStat(TEST_PATH);
+ Assert.assertEquals((long) beanServer.getAttribute(rootname, "ReadCounter"), 4);
+ Assert
+ .assertEquals((long) beanServer.getAttribute(rootname, "ReadBytesCounter"), TEST_DATA_SIZE);
+ Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "ReadCounter"), 3);
+ Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "ReadBytesCounter"),
+ TEST_DATA_SIZE);
+ zkClient.readDataAndStat(TEST_PATH, new Stat(), true);
+ Assert.assertEquals((long) beanServer.getAttribute(rootname, "ReadCounter"), 5);
+
+ ZkAsyncCallbacks.ExistsCallbackHandler callbackHandler =
+ new ZkAsyncCallbacks.ExistsCallbackHandler();
+ zkClient.asyncExists(TEST_PATH, callbackHandler);
+ callbackHandler.waitForSuccess();
+ Assert.assertEquals((long) beanServer.getAttribute(rootname, "ReadCounter"), 6);
+
+ // Test write
+ zkClient.writeData(TEST_PATH, TEST_DATA);
+ Assert.assertEquals((long) beanServer.getAttribute(rootname, "WriteCounter"), 2);
+ Assert.assertEquals((long) beanServer.getAttribute(rootname, "WriteBytesCounter"),
+ TEST_DATA_SIZE * 2);
+ Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "WriteCounter"), 2);
+ Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "WriteBytesCounter"),
+ TEST_DATA_SIZE * 2);
+ Assert.assertTrue((long) beanServer.getAttribute(rootname, "WriteTotalLatencyCounter")
+ >= origWriteTotalLatencyCounter);
+ Assert.assertTrue((long) beanServer.getAttribute(idealStatename, "WriteTotalLatencyCounter")
+ >= origIdealStatesWriteTotalLatencyCounter);
+
+ // Test data change count
+ final Lock lock = new ReentrantLock();
+ final Condition callbackFinish = lock.newCondition();
+ zkClient.subscribeDataChanges(TEST_PATH, new IZkDataListener() {
+ @Override
+ public void handleDataChange(String dataPath, Object data) throws Exception {
+ }
+
+ @Override
+ public void handleDataDeleted(String dataPath) throws Exception {
+ lock.lock();
+ try {
+ callbackFinish.signal();
+ } finally {
+ lock.unlock();
+ }
+ }
+ });
+ lock.lock();
+ _zkClient.delete(TEST_PATH);
+ Assert.assertTrue(callbackFinish.await(10, TimeUnit.SECONDS));
+ Assert.assertEquals((long) beanServer.getAttribute(name, "DataChangeEventCounter"), 1);
+ }
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/7bb55742/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkClient.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkClient.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkClient.java
deleted file mode 100644
index a18dd29..0000000
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkClient.java
+++ /dev/null
@@ -1,294 +0,0 @@
-package org.apache.helix.manager.zk;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.lang.management.ManagementFactory;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
-import org.I0Itec.zkclient.IZkDataListener;
-import org.I0Itec.zkclient.IZkStateListener;
-import org.I0Itec.zkclient.ZkConnection;
-import org.apache.helix.HelixException;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.ZkUnitTestBase;
-import org.apache.helix.monitoring.mbeans.MBeanRegistrar;
-import org.apache.helix.monitoring.mbeans.MonitorDomainNames;
-import org.apache.helix.monitoring.mbeans.ZkClientMonitor;
-import org.apache.helix.monitoring.mbeans.ZkClientPathMonitor;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.Watcher.Event.KeeperState;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.data.Stat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testng.Assert;
-import org.testng.AssertJUnit;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-
-public class TestZkClient extends ZkUnitTestBase {
- private static Logger LOG = LoggerFactory.getLogger(TestZkClient.class);
-
- ZkClient _zkClient;
-
- @BeforeClass
- public void beforeClass() {
- _zkClient = new ZkClient(ZK_ADDR);
- _zkClient.setZkSerializer(new ZNRecordSerializer());
- }
-
- @AfterClass
- public void afterClass() {
- _zkClient.close();
- }
-
- @Test()
- void testGetStat() {
- String path = "/tmp/getStatTest";
- _zkClient.deleteRecursively(path);
-
- Stat stat, newStat;
- stat = _zkClient.getStat(path);
- AssertJUnit.assertNull(stat);
- _zkClient.createPersistent(path, true);
-
- stat = _zkClient.getStat(path);
- AssertJUnit.assertNotNull(stat);
-
- newStat = _zkClient.getStat(path);
- AssertJUnit.assertEquals(stat, newStat);
-
- _zkClient.writeData(path, new ZNRecord("Test"));
- newStat = _zkClient.getStat(path);
- AssertJUnit.assertNotSame(stat, newStat);
- }
-
- @Test()
- void testSessionExpire() throws Exception {
- IZkStateListener listener = new IZkStateListener() {
-
- @Override
- public void handleStateChanged(KeeperState state) throws Exception {
- System.out.println("In Old connection New state " + state);
- }
-
- @Override
- public void handleNewSession() throws Exception {
- System.out.println("In Old connection New session");
- }
-
- @Override
- public void handleSessionEstablishmentError(Throwable var1) throws Exception {
- }
- };
-
- _zkClient.subscribeStateChanges(listener);
- ZkConnection connection = ((ZkConnection) _zkClient.getConnection());
- ZooKeeper zookeeper = connection.getZookeeper();
- System.out.println("old sessionId= " + zookeeper.getSessionId());
- Watcher watcher = new Watcher() {
- @Override
- public void process(WatchedEvent event) {
- System.out.println("In New connection In process event:" + event);
- }
- };
- ZooKeeper newZookeeper =
- new ZooKeeper(connection.getServers(), zookeeper.getSessionTimeout(), watcher,
- zookeeper.getSessionId(), zookeeper.getSessionPasswd());
- Thread.sleep(3000);
- System.out.println("New sessionId= " + newZookeeper.getSessionId());
- Thread.sleep(3000);
- newZookeeper.close();
- Thread.sleep(10000);
- connection = ((ZkConnection) _zkClient.getConnection());
- zookeeper = connection.getZookeeper();
- System.out.println("After session expiry sessionId= " + zookeeper.getSessionId());
- }
-
- @Test(expectedExceptions = HelixException.class, expectedExceptionsMessageRegExp = "Data size larger than 1M.*")
- void testDataSizeLimit() {
- ZNRecord data = new ZNRecord(new String(new char[1024 * 1024]));
- _zkClient.writeData("/test", data, -1);
- }
-
- @Test
- public void testZkClientMonitor() throws Exception {
- final String TEST_TAG = "test_monitor";
- final String TEST_KEY = "test_key";
- final String TEST_DATA = "testData";
- final String TEST_ROOT = "/my_cluster/IDEALSTATES";
- final String TEST_NODE = "/test_zkclient_monitor";
- final String TEST_PATH = TEST_ROOT + TEST_NODE;
-
- ZkClient.Builder builder = new ZkClient.Builder();
- builder.setZkServer(ZK_ADDR).setMonitorKey(TEST_KEY).setMonitorType(TEST_TAG)
- .setMonitorRootPathOnly(false);
- ZkClient zkClient = builder.build();
-
- final long TEST_DATA_SIZE = zkClient.serialize(TEST_DATA, TEST_PATH).length;
-
- if (_zkClient.exists(TEST_PATH)) {
- _zkClient.delete(TEST_PATH);
- }
- if (!_zkClient.exists(TEST_ROOT)) {
- _zkClient.createPersistent(TEST_ROOT, true);
- }
-
- MBeanServer beanServer = ManagementFactory.getPlatformMBeanServer();
-
- ObjectName name = MBeanRegistrar
- .buildObjectName(MonitorDomainNames.HelixZkClient.name(), ZkClientMonitor.MONITOR_TYPE,
- TEST_TAG, ZkClientMonitor.MONITOR_KEY, TEST_KEY);
- ObjectName rootname = MBeanRegistrar
- .buildObjectName(MonitorDomainNames.HelixZkClient.name(), ZkClientMonitor.MONITOR_TYPE,
- TEST_TAG, ZkClientMonitor.MONITOR_KEY, TEST_KEY, ZkClientPathMonitor.MONITOR_PATH,
- "Root");
- ObjectName idealStatename = MBeanRegistrar
- .buildObjectName(MonitorDomainNames.HelixZkClient.name(), ZkClientMonitor.MONITOR_TYPE,
- TEST_TAG, ZkClientMonitor.MONITOR_KEY, TEST_KEY, ZkClientPathMonitor.MONITOR_PATH,
- "IdealStates");
- Assert.assertTrue(beanServer.isRegistered(rootname));
- Assert.assertTrue(beanServer.isRegistered(idealStatename));
-
- // Test exists
- Assert.assertEquals((long) beanServer.getAttribute(rootname, "ReadCounter"), 0);
- Assert.assertEquals((long) beanServer.getAttribute(rootname, "ReadTotalLatencyCounter"), 0);
- Assert.assertEquals((long) beanServer.getAttribute(rootname, "ReadLatencyGauge.Max"), 0);
- zkClient.exists(TEST_ROOT);
- Assert.assertEquals((long) beanServer.getAttribute(rootname, "ReadCounter"), 1);
- Assert.assertTrue((long) beanServer.getAttribute(rootname, "ReadTotalLatencyCounter") >= 0);
- Assert.assertTrue((long) beanServer.getAttribute(rootname, "ReadLatencyGauge.Max") >= 0);
-
- // Test create
- Assert.assertEquals((long) beanServer.getAttribute(rootname, "WriteCounter"), 0);
- Assert.assertEquals((long) beanServer.getAttribute(rootname, "WriteBytesCounter"), 0);
- Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "WriteCounter"), 0);
- Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "WriteBytesCounter"), 0);
- Assert.assertEquals((long) beanServer.getAttribute(rootname, "WriteTotalLatencyCounter"), 0);
- Assert.assertEquals((long) beanServer.getAttribute(rootname, "WriteLatencyGauge.Max"), 0);
- Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "WriteTotalLatencyCounter"),
- 0);
- Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "WriteLatencyGauge.Max"), 0);
- zkClient.create(TEST_PATH, TEST_DATA, CreateMode.PERSISTENT);
- Assert.assertEquals((long) beanServer.getAttribute(rootname, "WriteCounter"), 1);
- Assert.assertEquals((long) beanServer.getAttribute(rootname, "WriteBytesCounter"),
- TEST_DATA_SIZE);
- Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "WriteCounter"), 1);
- Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "WriteBytesCounter"),
- TEST_DATA_SIZE);
- long origWriteTotalLatencyCounter =
- (long) beanServer.getAttribute(rootname, "WriteTotalLatencyCounter");
- Assert.assertTrue(origWriteTotalLatencyCounter >= 0);
- Assert.assertTrue((long) beanServer.getAttribute(rootname, "WriteLatencyGauge.Max") >= 0);
- long origIdealStatesWriteTotalLatencyCounter =
- (long) beanServer.getAttribute(idealStatename, "WriteTotalLatencyCounter");
- Assert.assertTrue(origIdealStatesWriteTotalLatencyCounter >= 0);
- Assert.assertTrue((long) beanServer.getAttribute(idealStatename, "WriteLatencyGauge.Max") >= 0);
-
- // Test read
- Assert.assertEquals((long) beanServer.getAttribute(rootname, "ReadCounter"), 1);
- Assert.assertEquals((long) beanServer.getAttribute(rootname, "ReadBytesCounter"), 0);
- Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "ReadCounter"), 0);
- Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "ReadBytesCounter"), 0);
- long origReadTotalLatencyCounter =
- (long) beanServer.getAttribute(rootname, "ReadTotalLatencyCounter");
- long origIdealStatesReadTotalLatencyCounter =
- (long) beanServer.getAttribute(idealStatename, "ReadTotalLatencyCounter");
- Assert.assertEquals(origIdealStatesReadTotalLatencyCounter, 0);
- Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "ReadLatencyGauge.Max"), 0);
- zkClient.readData(TEST_PATH, new Stat());
- Assert.assertEquals((long) beanServer.getAttribute(rootname, "ReadCounter"), 2);
- Assert
- .assertEquals((long) beanServer.getAttribute(rootname, "ReadBytesCounter"), TEST_DATA_SIZE);
- Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "ReadCounter"), 1);
- Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "ReadBytesCounter"),
- TEST_DATA_SIZE);
- Assert.assertTrue((long) beanServer.getAttribute(rootname, "ReadTotalLatencyCounter")
- >= origReadTotalLatencyCounter);
- Assert.assertTrue((long) beanServer.getAttribute(idealStatename, "ReadTotalLatencyCounter")
- >= origIdealStatesReadTotalLatencyCounter);
- Assert.assertTrue((long) beanServer.getAttribute(idealStatename, "ReadLatencyGauge.Max") >= 0);
- zkClient.getChildren(TEST_PATH);
- Assert.assertEquals((long) beanServer.getAttribute(rootname, "ReadCounter"), 3);
- Assert
- .assertEquals((long) beanServer.getAttribute(rootname, "ReadBytesCounter"), TEST_DATA_SIZE);
- Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "ReadCounter"), 2);
- Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "ReadBytesCounter"),
- TEST_DATA_SIZE);
- zkClient.getStat(TEST_PATH);
- Assert.assertEquals((long) beanServer.getAttribute(rootname, "ReadCounter"), 4);
- Assert
- .assertEquals((long) beanServer.getAttribute(rootname, "ReadBytesCounter"), TEST_DATA_SIZE);
- Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "ReadCounter"), 3);
- Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "ReadBytesCounter"),
- TEST_DATA_SIZE);
- zkClient.readDataAndStat(TEST_PATH, new Stat(), true);
- Assert.assertEquals((long) beanServer.getAttribute(rootname, "ReadCounter"), 5);
-
- ZkAsyncCallbacks.ExistsCallbackHandler callbackHandler =
- new ZkAsyncCallbacks.ExistsCallbackHandler();
- zkClient.asyncExists(TEST_PATH, callbackHandler);
- callbackHandler.waitForSuccess();
- Assert.assertEquals((long) beanServer.getAttribute(rootname, "ReadCounter"), 6);
-
- // Test write
- zkClient.writeData(TEST_PATH, TEST_DATA);
- Assert.assertEquals((long) beanServer.getAttribute(rootname, "WriteCounter"), 2);
- Assert.assertEquals((long) beanServer.getAttribute(rootname, "WriteBytesCounter"),
- TEST_DATA_SIZE * 2);
- Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "WriteCounter"), 2);
- Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "WriteBytesCounter"),
- TEST_DATA_SIZE * 2);
- Assert.assertTrue((long) beanServer.getAttribute(rootname, "WriteTotalLatencyCounter")
- >= origWriteTotalLatencyCounter);
- Assert.assertTrue((long) beanServer.getAttribute(idealStatename, "WriteTotalLatencyCounter")
- >= origIdealStatesWriteTotalLatencyCounter);
-
- // Test data change count
- final Lock lock = new ReentrantLock();
- final Condition callbackFinish = lock.newCondition();
- zkClient.subscribeDataChanges(TEST_PATH, new IZkDataListener() {
- @Override
- public void handleDataChange(String dataPath, Object data) throws Exception {
- }
-
- @Override
- public void handleDataDeleted(String dataPath) throws Exception {
- lock.lock();
- try {
- callbackFinish.signal();
- } finally {
- lock.unlock();
- }
- }
- });
- lock.lock();
- _zkClient.delete(TEST_PATH);
- Assert.assertTrue(callbackFinish.await(10, TimeUnit.SECONDS));
- Assert.assertEquals((long) beanServer.getAttribute(name, "DataChangeEventCounter"), 1);
- }
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/7bb55742/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkReconnect.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkReconnect.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkReconnect.java
index 1f72948..691623e 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkReconnect.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkReconnect.java
@@ -32,6 +32,7 @@ import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
import org.apache.helix.SystemPropertyKeys;
import org.apache.helix.TestHelper;
+import org.apache.helix.manager.zk.client.HelixZkClient;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.tools.ClusterSetup;
import org.apache.zookeeper.WatchedEvent;
@@ -94,7 +95,7 @@ public class TestZkReconnect {
// 1. shutdown zkServer and check if handler trigger callback
zkServer.shutdown();
// Simulate a retry in ZkClient that will not succeed
- injectExpire(controller._zkclient);
+ injectExpire((ZkClient) controller._zkclient);
Assert.assertFalse(controller._zkclient.waitUntilConnected(5000, TimeUnit.MILLISECONDS));
// While retrying, onDisconnectedFlag = false
Assert.assertFalse(onDisconnectedFlag.get());
@@ -102,7 +103,7 @@ public class TestZkReconnect {
// 2. restart zkServer and check if handler will recover connection
zkServer.start();
Assert.assertTrue(controller._zkclient
- .waitUntilConnected(ZkClient.DEFAULT_CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS));
+ .waitUntilConnected(HelixZkClient.DEFAULT_CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS));
Assert.assertTrue(controller.isConnected());
// New propertyStore should be in good state
http://git-wip-us.apache.org/repos/asf/helix/blob/7bb55742/helix-core/src/test/java/org/apache/helix/manager/zk/client/TestHelixZkClient.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/client/TestHelixZkClient.java b/helix-core/src/test/java/org/apache/helix/manager/zk/client/TestHelixZkClient.java
new file mode 100644
index 0000000..67e2731
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/client/TestHelixZkClient.java
@@ -0,0 +1,197 @@
+package org.apache.helix.manager.zk.client;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.concurrent.TimeUnit;
+
+import org.I0Itec.zkclient.IZkDataListener;
+import org.I0Itec.zkclient.ZkConnection;
+import org.apache.helix.HelixException;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZkUnitTestBase;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestHelixZkClient extends ZkUnitTestBase {
+ final String TEST_NODE = "/test_helix_zkclient";
+
+ @Test public void testZkConnectionManager() {
+ final String TEST_ROOT = "/testZkConnectionManager/IDEALSTATES";
+ final String TEST_PATH = TEST_ROOT + TEST_NODE;
+
+ ZkConnectionManager zkConnectionManager =
+ new ZkConnectionManager(new ZkConnection(ZK_ADDR), HelixZkClient.DEFAULT_CONNECTION_TIMEOUT,
+ null);
+ Assert.assertTrue(zkConnectionManager.waitUntilConnected(1, TimeUnit.SECONDS));
+
+ // This client can write/read from ZK
+ zkConnectionManager.createPersistent(TEST_PATH, true);
+ zkConnectionManager.writeData(TEST_PATH, "Test");
+ Assert.assertTrue(zkConnectionManager.readData(TEST_PATH) != null);
+ zkConnectionManager.deleteRecursively(TEST_ROOT);
+
+ // This client can be shared, and cannot close when sharing
+ SharedZkClient sharedZkClient =
+ new SharedZkClient(zkConnectionManager, new HelixZkClient.ZkClientConfig(), null);
+ try {
+ zkConnectionManager.close();
+ Assert.fail("Dedicated ZkClient cannot be closed while sharing!");
+ } catch (HelixException hex) {
+ // expected
+ }
+
+ // This client can be closed normally when sharing ends
+ sharedZkClient.close();
+ Assert.assertTrue(sharedZkClient.isClosed());
+ Assert.assertFalse(sharedZkClient.waitUntilConnected(100, TimeUnit.MILLISECONDS));
+
+ zkConnectionManager.close();
+ Assert.assertTrue(zkConnectionManager.isClosed());
+ Assert.assertFalse(zkConnectionManager.waitUntilConnected(100, TimeUnit.MILLISECONDS));
+
+ // Sharing a closed dedicated ZkClient shall fail
+ try {
+ new SharedZkClient(zkConnectionManager, new HelixZkClient.ZkClientConfig(), null);
+ Assert.fail("Sharing a closed dedicated ZkClient shall fail.");
+ } catch (HelixException hex) {
+ // expected
+ }
+ }
+
+ @Test(dependsOnMethods = "testZkConnectionManager") public void testSharingZkClient()
+ throws Exception {
+ final String TEST_ROOT = "/testSharedZkClient/IDEALSTATES";
+ final String TEST_PATH = TEST_ROOT + TEST_NODE;
+
+ // A factory just for this tests, this for avoiding the impact from other tests running in parallel.
+ final SharedZkClientFactory testFactory = new SharedZkClientFactory();
+
+ HelixZkClient.ZkConnectionConfig connectionConfig =
+ new HelixZkClient.ZkConnectionConfig(ZK_ADDR);
+ HelixZkClient sharedZkClientA =
+ testFactory.buildZkClient(connectionConfig, new HelixZkClient.ZkClientConfig());
+ Assert.assertTrue(sharedZkClientA.waitUntilConnected(1, TimeUnit.SECONDS));
+
+ HelixZkClient sharedZkClientB =
+ testFactory.buildZkClient(connectionConfig, new HelixZkClient.ZkClientConfig());
+ Assert.assertTrue(sharedZkClientB.waitUntilConnected(1, TimeUnit.SECONDS));
+
+ Assert.assertEquals(testFactory.getActiveConnectionCount(), 1);
+
+ // client A and B is sharing the same session.
+ Assert.assertEquals(sharedZkClientA.getSessionId(), sharedZkClientB.getSessionId());
+ long sessionId = sharedZkClientA.getSessionId();
+
+ final int[] notificationCountA = { 0, 0 };
+ sharedZkClientA.subscribeDataChanges(TEST_PATH, new IZkDataListener() {
+ @Override public void handleDataChange(String s, Object o) {
+ notificationCountA[0]++;
+ }
+
+ @Override public void handleDataDeleted(String s) {
+ notificationCountA[1]++;
+ }
+ });
+ final int[] notificationCountB = { 0, 0 };
+ sharedZkClientB.subscribeDataChanges(TEST_PATH, new IZkDataListener() {
+ @Override public void handleDataChange(String s, Object o) {
+ notificationCountB[0]++;
+ }
+
+ @Override public void handleDataDeleted(String s) {
+ notificationCountB[1]++;
+ }
+ });
+
+ // Modify using client A and client B will get notification.
+ sharedZkClientA.createPersistent(TEST_PATH, true);
+ Assert.assertTrue(TestHelper.verify(new TestHelper.Verifier() {
+ @Override public boolean verify() {
+ return notificationCountB[0] == 1;
+ }
+ }, 1000));
+ Assert.assertEquals(notificationCountB[1], 0);
+
+ sharedZkClientA.deleteRecursively(TEST_ROOT);
+ Assert.assertTrue(TestHelper.verify(new TestHelper.Verifier() {
+ @Override public boolean verify() {
+ return notificationCountB[1] == 1;
+ }
+ }, 1000));
+ Assert.assertEquals(notificationCountB[0], 1);
+
+ try {
+ sharedZkClientA.createEphemeral(TEST_PATH, true);
+ Assert.fail("Create Ephemeral nodes using shared client should fail.");
+ } catch (HelixException he) {
+ // expected.
+ }
+
+ sharedZkClientA.close();
+ // Shared client A closed.
+ Assert.assertTrue(sharedZkClientA.isClosed());
+ Assert.assertFalse(sharedZkClientA.waitUntilConnected(100, TimeUnit.MILLISECONDS));
+ // Shared client B still open.
+ Assert.assertFalse(sharedZkClientB.isClosed());
+ Assert.assertTrue(sharedZkClientB.waitUntilConnected(100, TimeUnit.MILLISECONDS));
+
+ // client A cannot do any modify once closed.
+ try {
+ sharedZkClientA.createPersistent(TEST_PATH, true);
+ Assert.fail("Should not be able to create node with a closed client.");
+ } catch (Exception e) {
+ // expected to be here.
+ }
+
+ // Now modify using client B, and client A won't get notification.
+ sharedZkClientB.createPersistent(TEST_PATH, true);
+ Assert.assertTrue(TestHelper.verify(new TestHelper.Verifier() {
+ @Override public boolean verify() {
+ return notificationCountB[0] == 2;
+ }
+ }, 1000));
+ Assert.assertFalse(TestHelper.verify(new TestHelper.Verifier() {
+ @Override public boolean verify() {
+ return notificationCountA[0] == 2;
+ }
+ }, 1000));
+ sharedZkClientB.deleteRecursively(TEST_ROOT);
+
+ Assert.assertEquals(testFactory.getActiveConnectionCount(), 1);
+
+ sharedZkClientB.close();
+ // Shared client B closed.
+ Assert.assertTrue(sharedZkClientB.isClosed());
+ Assert.assertFalse(sharedZkClientB.waitUntilConnected(100, TimeUnit.MILLISECONDS));
+ Assert.assertEquals(testFactory.getActiveConnectionCount(), 0);
+
+ // Try to create new shared ZkClient, will get a different session
+ HelixZkClient sharedZkClientC =
+ testFactory.buildZkClient(connectionConfig, new HelixZkClient.ZkClientConfig());
+ Assert.assertFalse(sessionId == sharedZkClientC.getSessionId());
+ Assert.assertEquals(testFactory.getActiveConnectionCount(), 1);
+
+ sharedZkClientC.close();
+ // Shared client C closed.
+ Assert.assertTrue(sharedZkClientC.isClosed());
+ Assert.assertFalse(sharedZkClientC.waitUntilConnected(100, TimeUnit.MILLISECONDS));
+ Assert.assertEquals(testFactory.getActiveConnectionCount(), 0);
+ }
+}
[2/2] helix git commit: Introduce Helix ZkClient factory. And use the
factory to generate new ZkClient in the critical Helix components.
Posted by jx...@apache.org.
Introduce Helix ZkClient factory. And use the factory to generate new ZkClient in the critical Helix components.
The motivation of this change is sharing ZkConnection as much as possible.
DedicatedZkClient: the client that uses it's own connection.
SharedZkClient: the client that uses a shared ZkConnection with other share client.
Also defining a safer client interface HelixZkClient so as to hide the internal ZkConnection.
For the critical Helix components, the plan is:
- HelixManager (CONTROLLER, PARTICIPANT, CONTROLLER_PARTICIPANT, SPECTATOR): Dedicated ZkClient
- HelixManager (ADMINISTRATOR): Shared ZkClient
- HelixPropertyStore: Shared ZkClient
- HelixZkAccess or ZkAdmin: Shared ZkClient
ZkClient Guide Line
- DedicatedZkClient
Isolated, no latency concern.
- SharedZkClient
Eco friendly, could introduce more latency since multiple watchers are sequentially handled in a single connection.
Multiple clients will use the same session. So a major difference between shared clients and dedicated clients is that, closing a shared client does not close the session automatically. Given this limitation, creating ephemeral nodes using a shared ZkClient is not supported.
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/7bb55742
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/7bb55742
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/7bb55742
Branch: refs/heads/master
Commit: 7bb55742e2fe2b61c634dd559cf86a71da50fcdf
Parents: 281f5d1
Author: Jiajun Wang <jj...@linkedin.com>
Authored: Fri Aug 10 10:31:36 2018 -0700
Committer: Junkai Xue <jx...@linkedin.com>
Committed: Mon Oct 29 17:40:24 2018 -0700
----------------------------------------------------------------------
.../java/org/apache/helix/ConfigAccessor.java | 6 +-
.../helix/manager/zk/CallbackHandler.java | 11 +-
.../helix/manager/zk/ParticipantManager.java | 4 +-
.../apache/helix/manager/zk/ZKHelixAdmin.java | 50 +--
.../apache/helix/manager/zk/ZKHelixManager.java | 52 ++--
.../org/apache/helix/manager/zk/ZKUtil.java | 27 +-
.../helix/manager/zk/ZkBaseDataAccessor.java | 78 +----
.../manager/zk/ZkCacheBaseDataAccessor.java | 48 +--
.../org/apache/helix/manager/zk/ZkClient.java | 27 +-
.../zk/client/DedicatedZkClientFactory.java | 35 +++
.../helix/manager/zk/client/HelixZkClient.java | 306 +++++++++++++++++++
.../manager/zk/client/HelixZkClientFactory.java | 46 +++
.../helix/manager/zk/client/SharedZkClient.java | 92 ++++++
.../zk/client/SharedZkClientFactory.java | 87 ++++++
.../manager/zk/client/ZkConnectionManager.java | 120 ++++++++
.../helix/manager/zk/zookeeper/ZkClient.java | 146 +++++----
.../participant/HelixCustomCodeRunner.java | 16 +-
.../org/apache/helix/tools/MessagePoster.java | 8 +-
.../org/apache/helix/tools/TestExecutor.java | 13 +-
.../helix/tools/commandtools/ZKDumper.java | 8 +-
.../apache/helix/tools/commandtools/ZkCopy.java | 36 +--
.../integration/TestResourceGroupEndtoEnd.java | 2 +-
.../manager/ClusterControllerManager.java | 2 +-
.../manager/ClusterDistributedController.java | 2 +-
.../manager/MockParticipantManager.java | 2 +-
.../helix/manager/zk/TestRawZkClient.java | 294 ++++++++++++++++++
.../apache/helix/manager/zk/TestZkClient.java | 294 ------------------
.../helix/manager/zk/TestZkReconnect.java | 5 +-
.../manager/zk/client/TestHelixZkClient.java | 197 ++++++++++++
29 files changed, 1464 insertions(+), 550 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/7bb55742/helix-core/src/main/java/org/apache/helix/ConfigAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/ConfigAccessor.java b/helix-core/src/main/java/org/apache/helix/ConfigAccessor.java
index 70df719..2755113 100644
--- a/helix-core/src/main/java/org/apache/helix/ConfigAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/ConfigAccessor.java
@@ -27,12 +27,12 @@ import java.util.List;
import java.util.Map;
import java.util.TreeMap;
+import org.apache.helix.manager.zk.client.HelixZkClient;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.ConfigScope;
import org.apache.helix.model.HelixConfigScope;
import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
import org.apache.helix.manager.zk.ZKUtil;
-import org.apache.helix.manager.zk.ZkClient;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.ResourceConfig;
import org.apache.helix.model.builder.HelixConfigScopeBuilder;
@@ -66,13 +66,13 @@ public class ConfigAccessor {
// @formatter:on
}
- private final ZkClient zkClient;
+ private final HelixZkClient zkClient;
/**
* Initialize an accessor with a Zookeeper client
* @param zkClient
*/
- public ConfigAccessor(ZkClient zkClient) {
+ public ConfigAccessor(HelixZkClient zkClient) {
this.zkClient = zkClient;
}
http://git-wip-us.apache.org/repos/asf/helix/blob/7bb55742/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
index cd446e8..b6d452d 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
@@ -57,6 +57,7 @@ import org.apache.helix.PropertyKey;
import org.apache.helix.PropertyPathConfig;
import org.apache.helix.ZNRecord;
import org.apache.helix.common.DedupEventProcessor;
+import org.apache.helix.manager.zk.client.HelixZkClient;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.ExternalView;
@@ -94,7 +95,7 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener {
private final Set<EventType> _eventTypes;
private final HelixDataAccessor _accessor;
private final ChangeType _changeType;
- private final ZkClient _zkClient;
+ private final HelixZkClient _zkClient;
private final AtomicLong _lastNotificationTimeStamp;
private final HelixManager _manager;
private final PropertyKey _propertyKey;
@@ -178,14 +179,22 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener {
*/
private List<NotificationContext.Type> _expectTypes = nextNotificationType.get(Type.FINALIZE);
+ @Deprecated
public CallbackHandler(HelixManager manager, ZkClient client, PropertyKey propertyKey,
Object listener, EventType[] eventTypes, ChangeType changeType) {
this(manager, client, propertyKey, listener, eventTypes, changeType, null);
}
+ @Deprecated
public CallbackHandler(HelixManager manager, ZkClient client, PropertyKey propertyKey,
Object listener, EventType[] eventTypes, ChangeType changeType,
HelixCallbackMonitor monitor) {
+ this(manager, (HelixZkClient) client, propertyKey, listener, eventTypes, changeType, monitor);
+ }
+
+ public CallbackHandler(HelixManager manager, HelixZkClient client, PropertyKey propertyKey,
+ Object listener, EventType[] eventTypes, ChangeType changeType,
+ HelixCallbackMonitor monitor) {
if (listener == null) {
throw new HelixException("listener could not be null");
}
http://git-wip-us.apache.org/repos/asf/helix/blob/7bb55742/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
index c1d96c8..36fb969 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
@@ -39,6 +39,7 @@ import org.apache.helix.PreConnectCallback;
import org.apache.helix.PropertyKey;
import org.apache.helix.ZNRecord;
import org.apache.helix.ZNRecordBucketizer;
+import org.apache.helix.manager.zk.client.HelixZkClient;
import org.apache.helix.messaging.DefaultMessagingService;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.HelixConfigScope;
@@ -47,7 +48,6 @@ import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.ParticipantHistory;
import org.apache.helix.model.StateModelDefinition;
import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
-import org.apache.helix.model.Message.MessageType;
import org.apache.helix.model.builder.HelixConfigScopeBuilder;
import org.apache.helix.participant.StateMachineEngine;
import org.apache.helix.participant.statemachine.ScheduledTaskStateModelFactory;
@@ -61,7 +61,7 @@ import org.apache.zookeeper.data.Stat;
public class ParticipantManager {
private static Logger LOG = LoggerFactory.getLogger(ParticipantManager.class);
- final ZkClient _zkclient;
+ final HelixZkClient _zkclient;
final HelixManager _manager;
final PropertyKey.Builder _keyBuilder;
final String _clusterName;
http://git-wip-us.apache.org/repos/asf/helix/blob/7bb55742/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
index 59336fd..0f79175 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
@@ -36,6 +36,7 @@ import java.util.Set;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
+
import org.I0Itec.zkclient.DataUpdater;
import org.I0Itec.zkclient.exception.ZkNoNodeException;
import org.apache.helix.AccessOption;
@@ -53,6 +54,8 @@ import org.apache.helix.PropertyPathBuilder;
import org.apache.helix.PropertyType;
import org.apache.helix.ZNRecord;
import org.apache.helix.controller.rebalancer.strategy.RebalanceStrategy;
+import org.apache.helix.manager.zk.client.HelixZkClient;
+import org.apache.helix.manager.zk.client.SharedZkClientFactory;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.ClusterConstraints;
import org.apache.helix.model.ClusterConstraints.ConstraintType;
@@ -78,20 +81,23 @@ import org.slf4j.LoggerFactory;
public class ZKHelixAdmin implements HelixAdmin {
public static final String CONNECTION_TIMEOUT = "helixAdmin.timeOutInSec";
- private final ZkClient _zkClient;
+ private final HelixZkClient _zkClient;
private final ConfigAccessor _configAccessor;
private static Logger logger = LoggerFactory.getLogger(ZKHelixAdmin.class);
- public ZKHelixAdmin(ZkClient zkClient) {
+ public ZKHelixAdmin(HelixZkClient zkClient) {
_zkClient = zkClient;
_configAccessor = new ConfigAccessor(zkClient);
}
public ZKHelixAdmin(String zkAddress) {
int timeOutInSec = Integer.parseInt(System.getProperty(CONNECTION_TIMEOUT, "30"));
- _zkClient = new ZkClient(zkAddress, timeOutInSec * 1000);
- _zkClient.setZkSerializer(new ZNRecordSerializer());
+ HelixZkClient.ZkClientConfig clientConfig = new HelixZkClient.ZkClientConfig();
+ clientConfig.setZkSerializer(new ZNRecordSerializer())
+ .setConnectInitTimeout(timeOutInSec * 1000);
+ _zkClient = SharedZkClientFactory.getInstance()
+ .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddress), clientConfig);
_zkClient.waitUntilConnected(timeOutInSec, TimeUnit.SECONDS);
_configAccessor = new ConfigAccessor(_zkClient);
}
@@ -207,8 +213,7 @@ public class ZKHelixAdmin implements HelixAdmin {
}
@Override
- public void enableInstance(String clusterName, List<String> instances,
- boolean enabled) {
+ public void enableInstance(String clusterName, List<String> instances, boolean enabled) {
// TODO: Reenable this after storage node bug fixed.
if (true) {
throw new HelixException("Current batch enable/disable instances are temporarily disabled!");
@@ -698,8 +703,7 @@ public class ZKHelixAdmin implements HelixAdmin {
}
@Override
- public void addResource(String clusterName, String resourceName,
- IdealState idealstate) {
+ public void addResource(String clusterName, String resourceName, IdealState idealstate) {
logger.info("Add resource {} in cluster {}.", resourceName, clusterName);
String stateModelRef = idealstate.getStateModelDefRef();
String stateModelDefPath = PropertyPathBuilder.stateModelDef(clusterName, stateModelRef);
@@ -874,8 +878,7 @@ public class ZKHelixAdmin implements HelixAdmin {
}
@Override
- public StateModelDefinition getStateModelDef(String clusterName,
- String stateModelName) {
+ public StateModelDefinition getStateModelDef(String clusterName, String stateModelName) {
HelixDataAccessor accessor =
new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
Builder keyBuilder = accessor.keyBuilder();
@@ -965,8 +968,8 @@ public class ZKHelixAdmin implements HelixAdmin {
}
@Override
- public void rebalance(String clusterName, String resourceName, int replica,
- String keyPrefix, String group) {
+ public void rebalance(String clusterName, String resourceName, int replica, String keyPrefix,
+ String group) {
List<String> instanceNames = new LinkedList<String>();
if (keyPrefix == null || keyPrefix.length() == 0) {
keyPrefix = resourceName;
@@ -1074,8 +1077,8 @@ public class ZKHelixAdmin implements HelixAdmin {
}
@Override
- public void addIdealState(String clusterName, String resourceName,
- String idealStateFile) throws IOException {
+ public void addIdealState(String clusterName, String resourceName, String idealStateFile)
+ throws IOException {
logger.info("Add IdealState for resource {} to cluster {} by file name {}.", resourceName,
clusterName, idealStateFile);
ZNRecord idealStateRecord =
@@ -1132,9 +1135,9 @@ public class ZKHelixAdmin implements HelixAdmin {
baseAccessor.update(path, new DataUpdater<ZNRecord>() {
@Override
public ZNRecord update(ZNRecord currentData) {
- ClusterConstraints constraints = currentData == null
- ? new ClusterConstraints(constraintType)
- : new ClusterConstraints(currentData);
+ ClusterConstraints constraints = currentData == null ?
+ new ClusterConstraints(constraintType) :
+ new ClusterConstraints(currentData);
constraints.addConstraintItem(constraintId, constraintItem);
return constraints.getRecord();
@@ -1167,8 +1170,7 @@ public class ZKHelixAdmin implements HelixAdmin {
}
@Override
- public ClusterConstraints getConstraints(String clusterName,
- ConstraintType constraintType) {
+ public ClusterConstraints getConstraints(String clusterName, ConstraintType constraintType) {
HelixDataAccessor accessor =
new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
@@ -1183,7 +1185,6 @@ public class ZKHelixAdmin implements HelixAdmin {
* @param clusterName
* @param currentIdealState
* @param instanceNames
- *
* @return
*/
@Override
@@ -1250,7 +1251,8 @@ public class ZKHelixAdmin implements HelixAdmin {
}
if (!ZKUtil.isInstanceSetup(_zkClient, clusterName, instanceName, InstanceType.PARTICIPANT)) {
- throw new HelixException("cluster " + clusterName + " instance " + instanceName + " is not setup yet");
+ throw new HelixException(
+ "cluster " + clusterName + " instance " + instanceName + " is not setup yet");
}
HelixDataAccessor accessor =
new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
@@ -1318,8 +1320,7 @@ public class ZKHelixAdmin implements HelixAdmin {
}
@Override
- public void enableBatchMessageMode(String clusterName, String resourceName,
- boolean enabled) {
+ public void enableBatchMessageMode(String clusterName, String resourceName, boolean enabled) {
logger.info("{} batch message mode for resource {} in cluster {}.",
enabled ? "Enable" : "Disable", resourceName, clusterName);
// TODO: Change IdealState to ResourceConfig when configs are migrated to ResourceConfig
@@ -1425,7 +1426,8 @@ public class ZKHelixAdmin implements HelixAdmin {
return instances;
}
- @Override public void close() {
+ @Override
+ public void close() {
if (_zkClient != null) {
_zkClient.close();
}
http://git-wip-us.apache.org/repos/asf/helix/blob/7bb55742/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
index e6fabc1..98e2737 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
@@ -20,12 +20,11 @@ package org.apache.helix.manager.zk;
*/
import org.I0Itec.zkclient.IZkStateListener;
-import org.I0Itec.zkclient.ZkConnection;
import org.I0Itec.zkclient.exception.ZkInterruptedException;
import org.apache.helix.*;
import org.apache.helix.HelixConstants.ChangeType;
import org.apache.helix.PropertyKey.Builder;
-import org.apache.helix.api.listeners.*;
+import org.apache.helix.api.listeners.ClusterConfigChangeListener;
import org.apache.helix.api.listeners.ConfigChangeListener;
import org.apache.helix.api.listeners.ControllerChangeListener;
import org.apache.helix.api.listeners.CurrentStateChangeListener;
@@ -34,11 +33,15 @@ import org.apache.helix.api.listeners.IdealStateChangeListener;
import org.apache.helix.api.listeners.InstanceConfigChangeListener;
import org.apache.helix.api.listeners.LiveInstanceChangeListener;
import org.apache.helix.api.listeners.MessageListener;
+import org.apache.helix.api.listeners.ResourceConfigChangeListener;
import org.apache.helix.api.listeners.ScopedConfigChangeListener;
import org.apache.helix.controller.GenericHelixController;
import org.apache.helix.healthcheck.ParticipantHealthReportCollector;
import org.apache.helix.healthcheck.ParticipantHealthReportCollectorImpl;
import org.apache.helix.healthcheck.ParticipantHealthReportTask;
+import org.apache.helix.manager.zk.client.DedicatedZkClientFactory;
+import org.apache.helix.manager.zk.client.HelixZkClient;
+import org.apache.helix.manager.zk.client.SharedZkClientFactory;
import org.apache.helix.messaging.DefaultMessagingService;
import org.apache.helix.model.BuiltInStateModelDefinitions;
import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
@@ -65,7 +68,6 @@ import java.util.Map;
import java.util.Timer;
import java.util.concurrent.TimeUnit;
-
public class ZKHelixManager implements HelixManager, IZkStateListener {
private static Logger LOG = LoggerFactory.getLogger(ZKHelixManager.class);
@@ -92,7 +94,7 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
private final String _version;
private int _reportLatency;
- protected ZkClient _zkclient = null;
+ protected HelixZkClient _zkclient = null;
private final DefaultMessagingService _messagingService;
private Map<ChangeType, HelixCallbackMonitor> _callbackMonitors;
@@ -229,11 +231,11 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
ZKHelixManager.DEFAULT_MAX_DISCONNECT_THRESHOLD);
_sessionTimeout = HelixUtil.getSystemPropertyAsInt(SystemPropertyKeys.ZK_SESSION_TIMEOUT,
- ZkClient.DEFAULT_SESSION_TIMEOUT);
+ HelixZkClient.DEFAULT_SESSION_TIMEOUT);
_connectionInitTimeout = HelixUtil
.getSystemPropertyAsInt(SystemPropertyKeys.ZK_CONNECTION_TIMEOUT,
- ZkClient.DEFAULT_CONNECTION_TIMEOUT);
+ HelixZkClient.DEFAULT_CONNECTION_TIMEOUT);
_waitForConnectedTimeout = HelixUtil
.getSystemPropertyAsInt(SystemPropertyKeys.ZK_WAIT_CONNECTED_TIMEOUT,
@@ -594,17 +596,29 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
PathBasedZkSerializer zkSerializer =
ChainedPathZkSerializer.builder(new ZNRecordStreamingSerializer()).build();
- ZkClient.Builder zkClientBuilder = new ZkClient.Builder();
- zkClientBuilder.setZkServer(_zkAddress)
- .setSessionTimeout(_sessionTimeout)
- .setConnectionTimeout(_connectionInitTimeout)
+ HelixZkClient.ZkConnectionConfig connectionConfig = new HelixZkClient.ZkConnectionConfig(_zkAddress);
+ connectionConfig.setSessionTimeout(_sessionTimeout);
+ HelixZkClient.ZkClientConfig clientConfig = new HelixZkClient.ZkClientConfig();
+ clientConfig
.setZkSerializer(zkSerializer)
+ .setConnectInitTimeout(_connectionInitTimeout)
.setMonitorType(_instanceType.name())
.setMonitorKey(_clusterName)
.setMonitorInstanceName(_instanceName)
- .setMonitorRootPathOnly(!_instanceType.equals(InstanceType.CONTROLLER) &&
- !_instanceType.equals(InstanceType.CONTROLLER_PARTICIPANT));
- ZkClient newClient = zkClientBuilder.build();
+ .setMonitorRootPathOnly(!_instanceType.equals(InstanceType.CONTROLLER) && !_instanceType
+ .equals(InstanceType.CONTROLLER_PARTICIPANT));
+
+ HelixZkClient newClient;
+ switch (_instanceType) {
+ case ADMINISTRATOR:
+ newClient = SharedZkClientFactory.getInstance().buildZkClient(connectionConfig, clientConfig);
+ break;
+ default:
+ newClient = DedicatedZkClientFactory
+ .getInstance().buildZkClient(connectionConfig, clientConfig);
+ break;
+ }
+
synchronized (this) {
if (_zkclient != null) {
_zkclient.close();
@@ -896,8 +910,7 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
continue;
}
- ZkConnection zkConnection = ((ZkConnection) _zkclient.getConnection());
- _sessionId = Long.toHexString(zkConnection.getZookeeper().getSessionId());
+ _sessionId = Long.toHexString(_zkclient.getSessionId());
/**
* at the time we read session-id, zkconnection might be lost again
@@ -906,8 +919,7 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
} while (!isConnected || "0".equals(_sessionId));
LOG.info("Handling new session, session id: " + _sessionId + ", instance: " + _instanceName
- + ", instanceTye: " + _instanceType + ", cluster: " + _clusterName + ", zkconnection: "
- + ((ZkConnection) _zkclient.getConnection()).getZookeeper());
+ + ", instanceTye: " + _instanceType + ", cluster: " + _clusterName);
}
void initHandlers(List<CallbackHandler> handlers) {
@@ -960,9 +972,7 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
public void handleStateChanged(KeeperState state) {
switch (state) {
case SyncConnected:
- ZkConnection zkConnection = (ZkConnection) _zkclient.getConnection();
- LOG.info("KeeperState: " + state + ", instance: " + _instanceName + ", type: " + _instanceType
- + ", zookeeper:" + zkConnection.getZookeeper());
+ LOG.info("KeeperState: " + state + ", instance: " + _instanceName + ", type: " + _instanceType);
break;
case Disconnected:
/**
@@ -1084,7 +1094,7 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
_participantManager.reset();
}
_participantManager =
- new ParticipantManager(this, _zkclient, _sessionTimeout, _liveInstanceInfoProvider,
+ new ParticipantManager(this, (ZkClient) _zkclient, _sessionTimeout, _liveInstanceInfoProvider,
_preConnectCallbacks);
_participantManager.handleNewSession();
}
http://git-wip-us.apache.org/repos/asf/helix/blob/7bb55742/helix-core/src/main/java/org/apache/helix/manager/zk/ZKUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKUtil.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKUtil.java
index d3ee0c7..f8d1826 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKUtil.java
@@ -28,6 +28,7 @@ import org.apache.helix.BaseDataAccessor;
import org.apache.helix.InstanceType;
import org.apache.helix.PropertyPathBuilder;
import org.apache.helix.ZNRecord;
+import org.apache.helix.manager.zk.client.HelixZkClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.zookeeper.CreateMode;
@@ -40,7 +41,7 @@ public final class ZKUtil {
private ZKUtil() {
}
- public static boolean isClusterSetup(String clusterName, ZkClient zkClient) {
+ public static boolean isClusterSetup(String clusterName, HelixZkClient zkClient) {
if (clusterName == null) {
logger.info("Fail to check cluster setup : cluster name is null!");
return false;
@@ -86,7 +87,7 @@ public final class ZKUtil {
return isValid;
}
- public static boolean isInstanceSetup(ZkClient zkclient, String clusterName, String instanceName,
+ public static boolean isInstanceSetup(HelixZkClient zkclient, String clusterName, String instanceName,
InstanceType type) {
if (type == InstanceType.PARTICIPANT || type == InstanceType.CONTROLLER_PARTICIPANT) {
ArrayList<String> requiredPaths = new ArrayList<>();
@@ -119,7 +120,7 @@ public final class ZKUtil {
return true;
}
- public static void createChildren(ZkClient client, String parentPath, List<ZNRecord> list) {
+ public static void createChildren(HelixZkClient client, String parentPath, List<ZNRecord> list) {
client.createPersistent(parentPath, true);
if (list != null) {
for (ZNRecord record : list) {
@@ -128,7 +129,7 @@ public final class ZKUtil {
}
}
- public static void createChildren(ZkClient client, String parentPath, ZNRecord nodeRecord) {
+ public static void createChildren(HelixZkClient client, String parentPath, ZNRecord nodeRecord) {
client.createPersistent(parentPath, true);
String id = nodeRecord.getId();
@@ -136,7 +137,7 @@ public final class ZKUtil {
client.createPersistent(temp, nodeRecord);
}
- public static void dropChildren(ZkClient client, String parentPath, List<ZNRecord> list) {
+ public static void dropChildren(HelixZkClient client, String parentPath, List<ZNRecord> list) {
// TODO: check if parentPath exists
if (list != null) {
for (ZNRecord record : list) {
@@ -145,14 +146,14 @@ public final class ZKUtil {
}
}
- public static void dropChildren(ZkClient client, String parentPath, ZNRecord nodeRecord) {
+ public static void dropChildren(HelixZkClient client, String parentPath, ZNRecord nodeRecord) {
// TODO: check if parentPath exists
String id = nodeRecord.getId();
String temp = parentPath + "/" + id;
client.deleteRecursively(temp);
}
- public static List<ZNRecord> getChildren(ZkClient client, String path) {
+ public static List<ZNRecord> getChildren(HelixZkClient client, String path) {
// parent watch will be set by zkClient
List<String> children = client.getChildren(path);
if (children == null || children.size() == 0) {
@@ -174,7 +175,7 @@ public final class ZKUtil {
return childRecords;
}
- public static void updateIfExists(ZkClient client, String path, final ZNRecord record,
+ public static void updateIfExists(HelixZkClient client, String path, final ZNRecord record,
boolean mergeOnUpdate) {
if (client.exists(path)) {
DataUpdater<Object> updater = new DataUpdater<Object>() {
@@ -187,7 +188,7 @@ public final class ZKUtil {
}
}
- public static void createOrMerge(ZkClient client, String path, final ZNRecord record,
+ public static void createOrMerge(HelixZkClient client, String path, final ZNRecord record,
final boolean persistent, final boolean mergeOnUpdate) {
int retryCount = 0;
while (retryCount < RETRYLIMIT) {
@@ -223,7 +224,7 @@ public final class ZKUtil {
}
}
- public static void createOrUpdate(ZkClient client, String path, final ZNRecord record,
+ public static void createOrUpdate(HelixZkClient client, String path, final ZNRecord record,
final boolean persistent, final boolean mergeOnUpdate) {
int retryCount = 0;
while (retryCount < RETRYLIMIT) {
@@ -252,7 +253,7 @@ public final class ZKUtil {
}
}
- public static void asyncCreateOrMerge(ZkClient client, String path, final ZNRecord record,
+ public static void asyncCreateOrMerge(HelixZkClient client, String path, final ZNRecord record,
final boolean persistent, final boolean mergeOnUpdate) {
try {
if (client.exists(path)) {
@@ -287,7 +288,7 @@ public final class ZKUtil {
}
}
- public static void createOrReplace(ZkClient client, String path, final ZNRecord record,
+ public static void createOrReplace(HelixZkClient client, String path, final ZNRecord record,
final boolean persistent) {
int retryCount = 0;
while (retryCount < RETRYLIMIT) {
@@ -313,7 +314,7 @@ public final class ZKUtil {
}
}
- public static void subtract(ZkClient client, String path, final ZNRecord recordTosubtract) {
+ public static void subtract(HelixZkClient client, String path, final ZNRecord recordTosubtract) {
int retryCount = 0;
while (retryCount < RETRYLIMIT) {
try {
http://git-wip-us.apache.org/repos/asf/helix/blob/7bb55742/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
index 6811766..8d932c8 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
@@ -36,13 +36,13 @@ import org.I0Itec.zkclient.exception.ZkNodeExistsException;
import org.apache.helix.AccessOption;
import org.apache.helix.BaseDataAccessor;
import org.apache.helix.HelixException;
-import org.apache.helix.ZNRecord;
import org.apache.helix.api.exceptions.HelixMetaDataAccessException;
import org.apache.helix.manager.zk.ZkAsyncCallbacks.CreateCallbackHandler;
import org.apache.helix.manager.zk.ZkAsyncCallbacks.DeleteCallbackHandler;
import org.apache.helix.manager.zk.ZkAsyncCallbacks.ExistsCallbackHandler;
import org.apache.helix.manager.zk.ZkAsyncCallbacks.GetDataCallbackHandler;
import org.apache.helix.manager.zk.ZkAsyncCallbacks.SetDataCallbackHandler;
+import org.apache.helix.manager.zk.client.HelixZkClient;
import org.apache.helix.store.zk.ZNode;
import org.apache.helix.util.HelixUtil;
import org.apache.zookeeper.CreateMode;
@@ -75,7 +75,7 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
public AccessResult() {
_retCode = RetCode.ERROR;
- _pathCreated = new ArrayList<String>();
+ _pathCreated = new ArrayList<>();
_stat = new Stat();
_updatedValue = null;
}
@@ -83,9 +83,9 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
private static Logger LOG = LoggerFactory.getLogger(ZkBaseDataAccessor.class);
- private final ZkClient _zkClient;
+ private final HelixZkClient _zkClient;
- public ZkBaseDataAccessor(ZkClient zkClient) {
+ public ZkBaseDataAccessor(HelixZkClient zkClient) {
if (zkClient == null) {
throw new NullPointerException("zkclient is null");
}
@@ -431,7 +431,6 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
return getChildren(parentPath, stats, options, false);
}
-
@Override
public List<T> getChildren(String parentPath, List<Stat> stats, int options, int retryCount,
int retryInterval) throws HelixException {
@@ -587,7 +586,7 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
}
List<String> parentPaths =
- new ArrayList<String>(Collections.<String> nCopies(paths.size(), null));
+ new ArrayList<>(Collections.<String>nCopies(paths.size(), null));
boolean failOnNoNode = false;
for (int i = 0; i < paths.size(); i++) {
@@ -658,7 +657,7 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
boolean[] needCreate = new boolean[paths.size()];
Arrays.fill(needCreate, true);
List<List<String>> pathsCreated =
- new ArrayList<List<String>>(Collections.<List<String>> nCopies(paths.size(), null));
+ new ArrayList<>(Collections.<List<String>>nCopies(paths.size(), null));
long startT = System.nanoTime();
try {
@@ -712,7 +711,7 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
return success;
}
- List<Stat> setStats = new ArrayList<Stat>(Collections.<Stat> nCopies(paths.size(), null));
+ List<Stat> setStats = new ArrayList<>(Collections.<Stat>nCopies(paths.size(), null));
SetDataCallbackHandler[] cbList = new SetDataCallbackHandler[paths.size()];
CreateCallbackHandler[] createCbList = null;
boolean[] needSet = new boolean[paths.size()];
@@ -1106,69 +1105,6 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
_zkClient.unsubscribeChildChanges(path, childListener);
}
- // simple test
- public static void main(String[] args) {
- ZkClient zkclient = new ZkClient("localhost:2191");
- zkclient.setZkSerializer(new ZNRecordSerializer());
- ZkBaseDataAccessor<ZNRecord> accessor = new ZkBaseDataAccessor<ZNRecord>(zkclient);
-
- // test async create
- List<String> createPaths = Arrays.asList("/test/child1/child1", "/test/child2/child2");
- List<ZNRecord> createRecords = Arrays.asList(new ZNRecord("child1"), new ZNRecord("child2"));
-
- boolean[] needCreate = new boolean[createPaths.size()];
- Arrays.fill(needCreate, true);
- List<List<String>> pathsCreated =
- new ArrayList<List<String>>(Collections.<List<String>> nCopies(createPaths.size(), null));
- accessor.create(createPaths, createRecords, needCreate, pathsCreated, AccessOption.PERSISTENT);
- System.out.println("pathsCreated: " + pathsCreated);
-
- // test async set
- List<String> setPaths = Arrays.asList("/test/setChild1/setChild1", "/test/setChild2/setChild2");
- List<ZNRecord> setRecords = Arrays.asList(new ZNRecord("setChild1"), new ZNRecord("setChild2"));
-
- pathsCreated =
- new ArrayList<List<String>>(Collections.<List<String>> nCopies(setPaths.size(), null));
- boolean[] success =
- accessor.set(setPaths, setRecords, pathsCreated, null, AccessOption.PERSISTENT);
- System.out.println("pathsCreated: " + pathsCreated);
- System.out.println("setSuccess: " + Arrays.toString(success));
-
- // test async update
- List<String> updatePaths =
- Arrays.asList("/test/updateChild1/updateChild1", "/test/setChild2/setChild2");
- class TestUpdater implements DataUpdater<ZNRecord> {
- final ZNRecord _newData;
-
- public TestUpdater(ZNRecord newData) {
- _newData = newData;
- }
-
- @Override
- public ZNRecord update(ZNRecord currentData) {
- return _newData;
-
- }
- }
- List<DataUpdater<ZNRecord>> updaters =
- Arrays.asList((DataUpdater<ZNRecord>) new TestUpdater(new ZNRecord("updateChild1")),
- (DataUpdater<ZNRecord>) new TestUpdater(new ZNRecord("updateChild2")));
-
- pathsCreated =
- new ArrayList<List<String>>(Collections.<List<String>> nCopies(updatePaths.size(), null));
-
- List<ZNRecord> updateRecords =
- accessor.update(updatePaths, updaters, pathsCreated, null, AccessOption.PERSISTENT);
- for (int i = 0; i < updatePaths.size(); i++) {
- success[i] = updateRecords.get(i) != null;
- }
- System.out.println("pathsCreated: " + pathsCreated);
- System.out.println("updateSuccess: " + Arrays.toString(success));
-
- System.out.println("CLOSING");
- zkclient.close();
- }
-
/**
* Reset
*/
http://git-wip-us.apache.org/repos/asf/helix/blob/7bb55742/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java
index 73cd2ae..67bf46e 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java
@@ -38,15 +38,17 @@ import org.apache.helix.AccessOption;
import org.apache.helix.HelixException;
import org.apache.helix.manager.zk.ZkAsyncCallbacks.CreateCallbackHandler;
import org.apache.helix.manager.zk.ZkBaseDataAccessor.RetCode;
+import org.apache.helix.manager.zk.client.HelixZkClient;
+import org.apache.helix.manager.zk.client.SharedZkClientFactory;
import org.apache.helix.store.HelixPropertyListener;
import org.apache.helix.store.HelixPropertyStore;
import org.apache.helix.store.zk.ZNode;
import org.apache.helix.util.PathUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.server.DataTree;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class ZkCacheBaseDataAccessor<T> implements HelixPropertyStore<T> {
private static final Logger LOG = LoggerFactory.getLogger(ZkCacheBaseDataAccessor.class);
@@ -67,7 +69,7 @@ public class ZkCacheBaseDataAccessor<T> implements HelixPropertyStore<T> {
private final ReentrantLock _eventLock = new ReentrantLock();
private ZkCacheEventThread _eventThread;
- private ZkClient _zkclient = null;
+ private HelixZkClient _zkclient = null;
public ZkCacheBaseDataAccessor(ZkBaseDataAccessor<T> baseAccessor, List<String> wtCachePaths) {
this(baseAccessor, null, wtCachePaths, null);
@@ -109,13 +111,14 @@ public class ZkCacheBaseDataAccessor<T> implements HelixPropertyStore<T> {
public ZkCacheBaseDataAccessor(String zkAddress, ZkSerializer serializer, String chrootPath,
List<String> wtCachePaths, List<String> zkCachePaths, String monitorType, String monitorkey) {
- ZkClient.Builder zkClientBuilder = new ZkClient.Builder();
- zkClientBuilder.setZkServer(zkAddress).setSessionTimeout(ZkClient.DEFAULT_SESSION_TIMEOUT)
- .setConnectionTimeout(ZkClient.DEFAULT_CONNECTION_TIMEOUT).setZkSerializer(serializer)
- .setMonitorType(monitorType).setMonitorKey(monitorkey);
- _zkclient = zkClientBuilder.build();
-
- _zkclient.waitUntilConnected(ZkClient.DEFAULT_CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS);
+ HelixZkClient.ZkClientConfig clientConfig = new HelixZkClient.ZkClientConfig();
+ clientConfig.setZkSerializer(serializer)
+ .setMonitorType(monitorType)
+ .setMonitorKey(monitorkey);
+ _zkclient = SharedZkClientFactory.getInstance()
+ .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddress), clientConfig);
+
+ _zkclient.waitUntilConnected(HelixZkClient.DEFAULT_CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS);
_baseAccessor = new ZkBaseDataAccessor<>(_zkclient);
if (chrootPath == null || chrootPath.equals("/")) {
@@ -196,8 +199,8 @@ public class ZkCacheBaseDataAccessor<T> implements HelixPropertyStore<T> {
if (cache == null && path.startsWith(cachePath)) {
cache = _cacheMap.get(cachePath);
} else if (cache != null && cache != _cacheMap.get(cachePath)) {
- throw new IllegalArgumentException("Couldn't do cross-cache async operations. paths: "
- + paths);
+ throw new IllegalArgumentException(
+ "Couldn't do cross-cache async operations. paths: " + paths);
}
}
}
@@ -368,9 +371,8 @@ public class ZkCacheBaseDataAccessor<T> implements HelixPropertyStore<T> {
// if cache miss, fall back to zk and update cache
try {
cache.lockWrite();
- record =
- _baseAccessor
- .get(serverPath, stat, options | AccessOption.THROW_EXCEPTION_IFNOTEXIST);
+ record = _baseAccessor
+ .get(serverPath, stat, options | AccessOption.THROW_EXCEPTION_IFNOTEXIST);
cache.update(serverPath, record, stat);
} catch (ZkNoNodeException e) {
if (AccessOption.isThrowExceptionIfNotExist(options)) {
@@ -433,7 +435,7 @@ public class ZkCacheBaseDataAccessor<T> implements HelixPropertyStore<T> {
boolean[] needCreate = new boolean[size];
Arrays.fill(needCreate, true);
List<List<String>> pathsCreatedList =
- new ArrayList<List<String>>(Collections.<List<String>> nCopies(size, null));
+ new ArrayList<List<String>>(Collections.<List<String>>nCopies(size, null));
CreateCallbackHandler[] createCbList =
_baseAccessor.create(serverPaths, records, needCreate, pathsCreatedList, options);
@@ -467,7 +469,7 @@ public class ZkCacheBaseDataAccessor<T> implements HelixPropertyStore<T> {
cache.lockWrite();
List<Stat> setStats = new ArrayList<Stat>();
List<List<String>> pathsCreatedList =
- new ArrayList<List<String>>(Collections.<List<String>> nCopies(size, null));
+ new ArrayList<List<String>>(Collections.<List<String>>nCopies(size, null));
boolean[] success =
_baseAccessor.set(serverPaths, records, pathsCreatedList, setStats, options);
@@ -498,7 +500,7 @@ public class ZkCacheBaseDataAccessor<T> implements HelixPropertyStore<T> {
List<Stat> setStats = new ArrayList<Stat>();
boolean[] success = new boolean[size];
List<List<String>> pathsCreatedList =
- new ArrayList<List<String>>(Collections.<List<String>> nCopies(size, null));
+ new ArrayList<List<String>>(Collections.<List<String>>nCopies(size, null));
List<T> updateData =
_baseAccessor.update(serverPaths, updaters, pathsCreatedList, setStats, options);
@@ -577,8 +579,8 @@ public class ZkCacheBaseDataAccessor<T> implements HelixPropertyStore<T> {
final int size = paths.size();
List<String> serverPaths = prependChroot(paths);
- List<T> records = new ArrayList<T>(Collections.<T> nCopies(size, null));
- List<Stat> readStats = new ArrayList<Stat>(Collections.<Stat> nCopies(size, null));
+ List<T> records = new ArrayList<T>(Collections.<T>nCopies(size, null));
+ List<Stat> readStats = new ArrayList<Stat>(Collections.<Stat>nCopies(size, null));
boolean needRead = false;
boolean needReads[] = new boolean[size]; // init to false
@@ -647,7 +649,7 @@ public class ZkCacheBaseDataAccessor<T> implements HelixPropertyStore<T> {
// System.out.println("zk-cache");
ZNode znode = cache.get(serverParentPath);
- if (znode != null && znode.getChildSet() != Collections.<String> emptySet()) {
+ if (znode != null && znode.getChildSet() != Collections.<String>emptySet()) {
// System.out.println("zk-cache-hit: " + parentPath);
List<String> childNames = new ArrayList<String>(znode.getChildSet());
Collections.sort(childNames);
@@ -689,8 +691,8 @@ public class ZkCacheBaseDataAccessor<T> implements HelixPropertyStore<T> {
}
@Override
- public List<T> getChildren(String parentPath, List<Stat> stats, int options,
- int retryCount, int retryInterval) throws HelixException {
+ public List<T> getChildren(String parentPath, List<Stat> stats, int options, int retryCount,
+ int retryInterval) throws HelixException {
return getChildren(parentPath, stats, options);
}
http://git-wip-us.apache.org/repos/asf/helix/blob/7bb55742/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java
index 8762585..89676db 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkClient.java
@@ -24,17 +24,36 @@ import org.I0Itec.zkclient.ZkConnection;
import org.I0Itec.zkclient.serialize.SerializableSerializer;
import org.I0Itec.zkclient.serialize.ZkSerializer;
import org.apache.helix.HelixException;
+import org.apache.helix.manager.zk.client.HelixZkClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
/**
- * This is a wrapper of {@link org.apache.helix.manager.zk.zookeeper.ZkClient},
+ * Raw ZkClient that wraps {@link org.apache.helix.manager.zk.zookeeper.ZkClient},
* with additional constructors and builder.
*
- * // TODO: we will need to merge two ZkClient into just one class.
+ * Note that, instead of directly constructing a raw ZkClient, applications should always use
+ * HelixZkClientFactory to build shared or dedicated HelixZkClient instances.
+ * Only constructing a raw ZkClient when advanced usage is required.
+ * For example, application need to access/manage ZkConnection directly.
+ *
+ * Both SharedZKClient and DedicatedZkClient are built based on the raw ZkClient. As shown below.
+ * ----------------------------
+ * | |
+ * --------------------- |
+ * | | | *implements
+ * SharedZkClient DedicatedZkClient ----> HelixZkClient Interface
+ * | | |
+ * --------------------- |
+ * | |
+ * Raw ZkClient (this class)--------
+ * |
+ * Native ZkClient
+ *
+ * TODO Completely replace usage of the raw ZkClient within helix-core. Instead, using HelixZkClient. --JJ
*/
-public class ZkClient extends org.apache.helix.manager.zk.zookeeper.ZkClient {
+
+public class ZkClient extends org.apache.helix.manager.zk.zookeeper.ZkClient implements HelixZkClient {
private static Logger LOG = LoggerFactory.getLogger(ZkClient.class);
public static final int DEFAULT_OPERATION_TIMEOUT = Integer.MAX_VALUE;
http://git-wip-us.apache.org/repos/asf/helix/blob/7bb55742/helix-core/src/main/java/org/apache/helix/manager/zk/client/DedicatedZkClientFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/client/DedicatedZkClientFactory.java b/helix-core/src/main/java/org/apache/helix/manager/zk/client/DedicatedZkClientFactory.java
new file mode 100644
index 0000000..edeb978
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/client/DedicatedZkClientFactory.java
@@ -0,0 +1,35 @@
+package org.apache.helix.manager.zk.client;
+
+import org.apache.helix.manager.zk.ZkClient;
+
+/**
+ * Singleton factory that build dedicated clients using the raw ZkClient.
+ */
+public class DedicatedZkClientFactory extends HelixZkClientFactory {
+
+ protected DedicatedZkClientFactory() {}
+
+ private static class SingletonHelper{
+ private static final DedicatedZkClientFactory INSTANCE = new DedicatedZkClientFactory();
+ }
+
+ public static DedicatedZkClientFactory getInstance(){
+ return SingletonHelper.INSTANCE;
+ }
+
+ /**
+ * Build a Dedicated ZkClient based on connection config and client config
+ *
+ * @param connectionConfig
+ * @param clientConfig
+ * @return
+ */
+ @Override
+ public HelixZkClient buildZkClient(HelixZkClient.ZkConnectionConfig connectionConfig,
+ HelixZkClient.ZkClientConfig clientConfig) {
+ return new ZkClient(createZkConnection(connectionConfig),
+ (int) clientConfig.getConnectInitTimeout(), clientConfig.getOperationRetryTimeout(),
+ clientConfig.getZkSerializer(), clientConfig.getMonitorType(), clientConfig.getMonitorKey(),
+ clientConfig.getMonitorInstanceName(), clientConfig.isMonitorRootPathOnly());
+ }
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/7bb55742/helix-core/src/main/java/org/apache/helix/manager/zk/client/HelixZkClient.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/client/HelixZkClient.java b/helix-core/src/main/java/org/apache/helix/manager/zk/client/HelixZkClient.java
new file mode 100644
index 0000000..65e0027
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/client/HelixZkClient.java
@@ -0,0 +1,306 @@
+package org.apache.helix.manager.zk.client;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.I0Itec.zkclient.DataUpdater;
+import org.I0Itec.zkclient.IZkChildListener;
+import org.I0Itec.zkclient.IZkDataListener;
+import org.I0Itec.zkclient.IZkStateListener;
+import org.I0Itec.zkclient.serialize.SerializableSerializer;
+import org.I0Itec.zkclient.serialize.ZkSerializer;
+import org.apache.helix.manager.zk.BasicZkSerializer;
+import org.apache.helix.manager.zk.PathBasedZkSerializer;
+import org.apache.helix.manager.zk.ZkAsyncCallbacks;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.Op;
+import org.apache.zookeeper.OpResult;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+
+/**
+ * Helix ZkClient interface.
+ */
+public interface HelixZkClient {
+ int DEFAULT_OPERATION_TIMEOUT = Integer.MAX_VALUE;
+ int DEFAULT_CONNECTION_TIMEOUT = 60 * 1000;
+ int DEFAULT_SESSION_TIMEOUT = 30 * 1000;
+
+ // listener subscription
+ List<String> subscribeChildChanges(String path, IZkChildListener listener);
+
+ void unsubscribeChildChanges(String path, IZkChildListener listener);
+
+ void subscribeDataChanges(String path, IZkDataListener listener);
+
+ void unsubscribeDataChanges(String path, IZkDataListener listener);
+
+ void subscribeStateChanges(final IZkStateListener listener);
+
+ void unsubscribeStateChanges(IZkStateListener listener);
+
+ void unsubscribeAll();
+
+ // data access
+ void createPersistent(String path);
+
+ void createPersistent(String path, boolean createParents);
+
+ void createPersistent(String path, boolean createParents, List<ACL> acl);
+
+ void createPersistent(String path, Object data);
+
+ void createPersistent(String path, Object data, List<ACL> acl);
+
+ String createPersistentSequential(String path, Object data);
+
+ String createPersistentSequential(String path, Object data, List<ACL> acl);
+
+ void createEphemeral(final String path);
+
+ void createEphemeral(final String path, final List<ACL> acl);
+
+ String create(final String path, Object data, final CreateMode mode);
+
+ String create(final String path, Object datat, final List<ACL> acl, final CreateMode mode);
+
+ void createEphemeral(final String path, final Object data);
+
+ void createEphemeral(final String path, final Object data, final List<ACL> acl);
+
+ String createEphemeralSequential(final String path, final Object data);
+
+ String createEphemeralSequential(final String path, final Object data, final List<ACL> acl);
+
+ List<String> getChildren(String path);
+
+ int countChildren(String path);
+
+ boolean exists(final String path);
+
+ Stat getStat(final String path);
+
+ boolean waitUntilExists(String path, TimeUnit timeUnit, long time);
+
+ void deleteRecursively(String path);
+
+ boolean delete(final String path);
+
+ <T extends Object> T readData(String path);
+
+ <T extends Object> T readData(String path, boolean returnNullIfPathNotExists);
+
+ <T extends Object> T readData(String path, Stat stat);
+
+ <T extends Object> T readData(final String path, final Stat stat, final boolean watch);
+
+ <T extends Object> T readDataAndStat(String path, Stat stat, boolean returnNullIfPathNotExists);
+
+ void writeData(String path, Object object);
+
+ <T extends Object> void updateDataSerialized(String path, DataUpdater<T> updater);
+
+ void writeData(final String path, Object datat, final int expectedVersion);
+
+ Stat writeDataReturnStat(final String path, Object datat, final int expectedVersion);
+
+ Stat writeDataGetStat(final String path, Object datat, final int expectedVersion);
+
+ void asyncCreate(final String path, Object datat, final CreateMode mode,
+ final ZkAsyncCallbacks.CreateCallbackHandler cb);
+
+ void asyncSetData(final String path, Object datat, final int version,
+ final ZkAsyncCallbacks.SetDataCallbackHandler cb);
+
+ void asyncGetData(final String path, final ZkAsyncCallbacks.GetDataCallbackHandler cb);
+
+ void asyncExists(final String path, final ZkAsyncCallbacks.ExistsCallbackHandler cb);
+
+ void asyncDelete(final String path, final ZkAsyncCallbacks.DeleteCallbackHandler cb);
+
+ void watchForData(final String path);
+
+ List<String> watchForChilds(final String path);
+
+ long getCreationTime(String path);
+
+ List<OpResult> multi(final Iterable<Op> ops);
+
+ // ZK state control
+ boolean waitUntilConnected(long time, TimeUnit timeUnit);
+
+ String getServers();
+
+ long getSessionId();
+
+ void close();
+
+ boolean isClosed();
+
+ // other
+ byte[] serialize(Object data, String path);
+
+ <T extends Object> T deserialize(byte[] data, String path);
+
+ void setZkSerializer(ZkSerializer zkSerializer);
+
+ void setZkSerializer(PathBasedZkSerializer zkSerializer);
+
+ PathBasedZkSerializer getZkSerializer();
+
+ /**
+ * Configuration for creating a new ZkConnection.
+ */
+ class ZkConnectionConfig {
+ // Connection configs
+ private final String _zkServers;
+ private int _sessionTimeout = HelixZkClient.DEFAULT_SESSION_TIMEOUT;
+
+ public ZkConnectionConfig(String zkServers) {
+ _zkServers = zkServers;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == this) {
+ return true;
+ }
+ if (!(obj instanceof ZkConnectionConfig)) {
+ return false;
+ }
+ ZkConnectionConfig configObj = (ZkConnectionConfig) obj;
+ return (_zkServers == null && configObj._zkServers == null ||
+ _zkServers != null && _zkServers.equals(configObj._zkServers)) &&
+ _sessionTimeout == configObj._sessionTimeout;
+ }
+
+ @Override
+ public int hashCode() {
+ return _sessionTimeout * 31 + _zkServers.hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return (_zkServers + "_" + _sessionTimeout).replaceAll("[\\W]", "_");
+ }
+
+ public ZkConnectionConfig setSessionTimeout(Integer sessionTimeout) {
+ this._sessionTimeout = sessionTimeout;
+ return this;
+ }
+
+ public String getZkServers() {
+ return _zkServers;
+ }
+
+ public int getSessionTimeout() {
+ return _sessionTimeout;
+ }
+ }
+
+ /**
+ * Configuration for creating a new ZkClient with serializer and monitor.
+ */
+ class ZkClientConfig {
+ // For client to init the connection
+ private long _connectInitTimeout = HelixZkClient.DEFAULT_CONNECTION_TIMEOUT;
+
+ // Data access configs
+ private long _operationRetryTimeout = HelixZkClient.DEFAULT_OPERATION_TIMEOUT;
+
+ // Others
+ private PathBasedZkSerializer _zkSerializer;
+
+ // Monitoring
+ private String _monitorType;
+ private String _monitorKey;
+ private String _monitorInstanceName = null;
+ private boolean _monitorRootPathOnly = true;
+
+ public ZkClientConfig setZkSerializer(PathBasedZkSerializer zkSerializer) {
+ this._zkSerializer = zkSerializer;
+ return this;
+ }
+
+ public ZkClientConfig setZkSerializer(ZkSerializer zkSerializer) {
+ this._zkSerializer = new BasicZkSerializer(zkSerializer);
+ return this;
+ }
+
+ /**
+ * Used as part of the MBean ObjectName. This item is required for enabling monitoring.
+ *
+ * @param monitorType
+ */
+ public ZkClientConfig setMonitorType(String monitorType) {
+ this._monitorType = monitorType;
+ return this;
+ }
+
+ /**
+ * Used as part of the MBean ObjectName. This item is required for enabling monitoring.
+ *
+ * @param monitorKey
+ */
+ public ZkClientConfig setMonitorKey(String monitorKey) {
+ this._monitorKey = monitorKey;
+ return this;
+ }
+
+ /**
+ * Used as part of the MBean ObjectName. This item is optional.
+ *
+ * @param instanceName
+ */
+ public ZkClientConfig setMonitorInstanceName(String instanceName) {
+ this._monitorInstanceName = instanceName;
+ return this;
+ }
+
+ public ZkClientConfig setMonitorRootPathOnly(Boolean monitorRootPathOnly) {
+ this._monitorRootPathOnly = monitorRootPathOnly;
+ return this;
+ }
+
+ public ZkClientConfig setOperationRetryTimeout(Long operationRetryTimeout) {
+ this._operationRetryTimeout = operationRetryTimeout;
+ return this;
+ }
+
+ public ZkClientConfig setConnectInitTimeout(long _connectInitTimeout) {
+ this._connectInitTimeout = _connectInitTimeout;
+ return this;
+ }
+
+ public PathBasedZkSerializer getZkSerializer() {
+ if (_zkSerializer == null) {
+ _zkSerializer = new BasicZkSerializer(new SerializableSerializer());
+ }
+ return _zkSerializer;
+ }
+
+ public long getOperationRetryTimeout() {
+ return _operationRetryTimeout;
+ }
+
+ public String getMonitorType() {
+ return _monitorType;
+ }
+
+ public String getMonitorKey() {
+ return _monitorKey;
+ }
+
+ public String getMonitorInstanceName() {
+ return _monitorInstanceName;
+ }
+
+ public boolean isMonitorRootPathOnly() {
+ return _monitorRootPathOnly;
+ }
+
+ public long getConnectInitTimeout() {
+ return _connectInitTimeout;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/7bb55742/helix-core/src/main/java/org/apache/helix/manager/zk/client/HelixZkClientFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/client/HelixZkClientFactory.java b/helix-core/src/main/java/org/apache/helix/manager/zk/client/HelixZkClientFactory.java
new file mode 100644
index 0000000..9d10cd3
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/client/HelixZkClientFactory.java
@@ -0,0 +1,46 @@
+package org.apache.helix.manager.zk.client;
+
+import org.I0Itec.zkclient.IZkConnection;
+import org.I0Itec.zkclient.ZkConnection;
+import org.apache.helix.HelixException;
+
+/**
+ * Abstract class of the ZkClient factory.
+ */
+abstract class HelixZkClientFactory {
+
+ /**
+ * Build a ZkClient using specified connection config and client config
+ *
+ * @param connectionConfig
+ * @param clientConfig
+ * @return HelixZkClient
+ */
+ public abstract HelixZkClient buildZkClient(HelixZkClient.ZkConnectionConfig connectionConfig,
+ HelixZkClient.ZkClientConfig clientConfig);
+
+ /**
+ * Build a ZkClient using specified connection config and default client config
+ *
+ * @param connectionConfig
+ * @return HelixZkClient
+ */
+ public HelixZkClient buildZkClient(HelixZkClient.ZkConnectionConfig connectionConfig) {
+ return buildZkClient(connectionConfig, new HelixZkClient.ZkClientConfig());
+ }
+
+ /**
+ * Construct a new ZkConnection instance based on connection configuration.
+ * Note that the connection is not really made until someone calls zkConnection.connect().
+ * @param connectionConfig
+ * @return
+ */
+ protected IZkConnection createZkConnection(HelixZkClient.ZkConnectionConfig connectionConfig) {
+ if (connectionConfig.getZkServers() == null) {
+ throw new HelixException(
+ "Failed to build ZkClient since no connection or ZK server address is specified.");
+ } else {
+ return new ZkConnection(connectionConfig.getZkServers(), connectionConfig.getSessionTimeout());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/7bb55742/helix-core/src/main/java/org/apache/helix/manager/zk/client/SharedZkClient.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/client/SharedZkClient.java b/helix-core/src/main/java/org/apache/helix/manager/zk/client/SharedZkClient.java
new file mode 100644
index 0000000..242dea0
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/client/SharedZkClient.java
@@ -0,0 +1,92 @@
+package org.apache.helix.manager.zk.client;
+
+import java.util.List;
+
+import org.I0Itec.zkclient.IZkConnection;
+import org.I0Itec.zkclient.ZkConnection;
+import org.apache.helix.HelixException;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.data.ACL;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * ZkClient that uses shared ZkConnection.
+ * A SharedZkClient won't manipulate the shared ZkConnection directly.
+ */
+class SharedZkClient extends org.apache.helix.manager.zk.ZkClient implements HelixZkClient {
+ private static Logger LOG = LoggerFactory.getLogger(SharedZkClient.class);
+ /*
+ * Since we cannot really disconnect the ZkConnection, we need a dummy ZkConnection placeholder.
+ * This is to ensure connection field is never null even the shared ZkClient instance is closed so as to avoid NPE.
+ */
+ private final static ZkConnection IDLE_CONNECTION = new ZkConnection("Dummy_ZkServers");
+ private final OnCloseCallback _onCloseCallback;
+ private final ZkConnectionManager _connectionManager;
+
+ interface OnCloseCallback {
+ /**
+ * Triggered after the ZkClient is closed.
+ */
+ void onClose();
+ }
+
+ /**
+ * Construct a shared ZkClient that uses a shared ZkConnection.
+ *
+ * @param connectionManager The manager of the shared ZkConnection.
+ * @param clientConfig ZkClientConfig details to create the shared ZkClient.
+ * @param callback Clean up logic when the shared ZkClient is closed.
+ */
+ protected SharedZkClient(ZkConnectionManager connectionManager, ZkClientConfig clientConfig,
+ OnCloseCallback callback) {
+ super(connectionManager.getConnection(), 0, clientConfig.getOperationRetryTimeout(),
+ clientConfig.getZkSerializer(), clientConfig.getMonitorType(), clientConfig.getMonitorKey(),
+ clientConfig.getMonitorInstanceName(), clientConfig.isMonitorRootPathOnly());
+ _connectionManager = connectionManager;
+ // Register to the base dedicated ZkClient
+ _connectionManager.registerWatcher(this);
+ _onCloseCallback = callback;
+ }
+
+ @Override
+ public void close() {
+ super.close();
+ if (isClosed()) {
+ // Note that if register is not done while constructing, these private fields may not be init yet.
+ if (_connectionManager != null) {
+ _connectionManager.unregisterWatcher(this);
+ }
+ if (_onCloseCallback != null) {
+ _onCloseCallback.onClose();
+ }
+ }
+ }
+
+ @Override
+ public IZkConnection getConnection() {
+ if (isClosed()) {
+ return IDLE_CONNECTION;
+ }
+ return super.getConnection();
+ }
+
+ /**
+ * Since ZkConnection session is shared in this ZkClient, do not create ephemeral node using a SharedZKClient.
+ */
+ @Override
+ public String create(final String path, Object datat, final List<ACL> acl,
+ final CreateMode mode) {
+ if (mode.isEphemeral()) {
+ throw new HelixException(
+ "Create ephemeral nodes using a " + SharedZkClient.class.getSimpleName()
+ + " ZkClient is not supported.");
+ }
+ return super.create(path, datat, acl, mode);
+ }
+
+ @Override
+ protected boolean isManagingZkConnection() {
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/7bb55742/helix-core/src/main/java/org/apache/helix/manager/zk/client/SharedZkClientFactory.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/client/SharedZkClientFactory.java b/helix-core/src/main/java/org/apache/helix/manager/zk/client/SharedZkClientFactory.java
new file mode 100644
index 0000000..ed4b5de
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/client/SharedZkClientFactory.java
@@ -0,0 +1,87 @@
+package org.apache.helix.manager.zk.client;
+
+import java.util.HashMap;
+
+import org.apache.helix.HelixException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Singleton factory that build shared ZkClient which use a shared ZkConnection.
+ */
+public class SharedZkClientFactory extends HelixZkClientFactory {
+ private static Logger LOG = LoggerFactory.getLogger(SharedZkClient.class);
+ // The connection pool to track all created connections.
+ private final HashMap<HelixZkClient.ZkConnectionConfig, ZkConnectionManager>
+ _connectionManagerPool = new HashMap<>();
+
+ protected SharedZkClientFactory() {}
+
+ private static class SingletonHelper {
+ private static final SharedZkClientFactory INSTANCE = new SharedZkClientFactory();
+ }
+
+ public static SharedZkClientFactory getInstance() {
+ return SingletonHelper.INSTANCE;
+ }
+
+ /**
+ * Build a Shared ZkClient that uses sharing ZkConnection that is created based on the specified connection config.
+ *
+ * @param connectionConfig The connection configuration that is used to search for a shared connection. Or create new connection if necessary.
+ * @param clientConfig
+ * @return Shared ZkClient
+ */
+ @Override
+ public HelixZkClient buildZkClient(HelixZkClient.ZkConnectionConfig connectionConfig,
+ HelixZkClient.ZkClientConfig clientConfig) {
+ synchronized (_connectionManagerPool) {
+ final ZkConnectionManager zkConnectionManager =
+ getOrCreateZkConnectionNamanger(connectionConfig, clientConfig.getConnectInitTimeout());
+ if (zkConnectionManager == null) {
+ throw new HelixException("Failed to create a connection manager in the pool to share.");
+ }
+ LOG.info("Sharing ZkConnection {} to a new SharedZkClient.", connectionConfig.toString());
+ return new SharedZkClient(zkConnectionManager, clientConfig,
+ new SharedZkClient.OnCloseCallback() {
+ @Override
+ public void onClose() {
+ cleanupConnectionManager(zkConnectionManager);
+ }
+ });
+ }
+ }
+
+ private ZkConnectionManager getOrCreateZkConnectionNamanger(
+ HelixZkClient.ZkConnectionConfig connectionConfig, long connectInitTimeout) {
+ ZkConnectionManager connectionManager = _connectionManagerPool.get(connectionConfig);
+ if (connectionManager == null || connectionManager.isClosed()) {
+ connectionManager = new ZkConnectionManager(createZkConnection(connectionConfig), connectInitTimeout,
+ connectionConfig.toString());
+ _connectionManagerPool.put(connectionConfig, connectionManager);
+ }
+ return connectionManager;
+ }
+
+ // Close the ZkConnectionManager if no other shared client is referring to it.
+ // Note the close operation of connection manager needs to be synchronized with the pool operation
+ // to avoid race condition.
+ private void cleanupConnectionManager(ZkConnectionManager zkConnectionManager) {
+ synchronized (_connectionManagerPool) {
+ zkConnectionManager.close(true);
+ }
+ }
+
+ // For test only
+ protected int getActiveConnectionCount() {
+ int count = 0;
+ synchronized (_connectionManagerPool) {
+ for (ZkConnectionManager manager : _connectionManagerPool.values()) {
+ if (!manager.isClosed()) {
+ count++;
+ }
+ }
+ }
+ return count;
+ }
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/7bb55742/helix-core/src/main/java/org/apache/helix/manager/zk/client/ZkConnectionManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/client/ZkConnectionManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/client/ZkConnectionManager.java
new file mode 100644
index 0000000..0a9ddc1
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/client/ZkConnectionManager.java
@@ -0,0 +1,120 @@
+package org.apache.helix.manager.zk.client;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.I0Itec.zkclient.IZkConnection;
+import org.I0Itec.zkclient.serialize.SerializableSerializer;
+import org.apache.helix.HelixException;
+import org.apache.helix.manager.zk.BasicZkSerializer;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A ZkConnection manager that maintain connection status and allows additional watchers to be registered.
+ * It will forward events to those watchers.
+ *
+ * TODO Separate connection management logic from the raw ZkClient class.
+ * So this manager is a peer to the ZkClient. Connection Manager for maintaining the connection and
+ * ZkClient to handle user request.
+ * After this is done, Dedicated ZkClient hires one manager for it's connection.
+ * While multiple Shared ZkClients can use single connection manager if possible.
+ */
+class ZkConnectionManager extends org.apache.helix.manager.zk.ZkClient {
+ private static Logger LOG = LoggerFactory.getLogger(ZkConnectionManager.class);
+ // Client type that is used in monitor, and metrics.
+ private final static String MONITOR_TYPE = "ZkConnectionManager";
+ private final String _monitorKey;
+ // Set of all registered watchers
+ private final Set<Watcher> _sharedWatchers = new HashSet<>();
+
+ /**
+ * Construct and init a ZkConnection Manager.
+ *
+ * @param zkConnection
+ * @param connectionTimeout
+ */
+ protected ZkConnectionManager(IZkConnection zkConnection, long connectionTimeout,
+ String monitorKey) {
+ super(zkConnection, (int) connectionTimeout, HelixZkClient.DEFAULT_OPERATION_TIMEOUT,
+ new BasicZkSerializer(new SerializableSerializer()), MONITOR_TYPE, monitorKey, null, true);
+ _monitorKey = monitorKey;
+ LOG.info("ZkConnection {} was created for sharing.", _monitorKey);
+ }
+
+ /**
+ * Register event watcher.
+ *
+ * @param watcher
+ * @return true if the watcher is newly added. false if it is already in record.
+ */
+ protected synchronized boolean registerWatcher(Watcher watcher) {
+ if (isClosed()) {
+ throw new HelixException("Cannot add watcher to a closed client.");
+ }
+ return _sharedWatchers.add(watcher);
+ }
+
+ /**
+ * Unregister the event watcher.
+ *
+ * @param watcher
+ * @return number of the remaining event watchers
+ */
+ protected synchronized int unregisterWatcher(Watcher watcher) {
+ _sharedWatchers.remove(watcher);
+ return _sharedWatchers.size();
+ }
+
+ @Override
+ public void process(final WatchedEvent event) {
+ super.process(event);
+ forwardingEvent(event);
+ }
+
+ private synchronized void forwardingEvent(final WatchedEvent event) {
+ // note that process (then forwardingEvent) could be triggered during construction, when sharedWatchers is still null.
+ if (_sharedWatchers == null || _sharedWatchers.isEmpty()) {
+ return;
+ }
+ // forward event to all the watchers' event queue
+ for (final Watcher watcher : _sharedWatchers) {
+ watcher.process(event);
+ }
+ }
+
+ @Override
+ public void close() {
+ // Enforce closing, if any watcher exists, throw Exception.
+ close(false);
+ }
+
+ protected synchronized void close(boolean skipIfWatched) {
+ cleanupInactiveWatchers();
+ if (_sharedWatchers.size() > 0) {
+ if (skipIfWatched) {
+ LOG.debug("Skip closing ZkConnection due to existing watchers. Watcher count {}.",
+ _sharedWatchers.size());
+ return;
+ } else {
+ throw new HelixException(
+ "Cannot close the connection when there are still shared watchers listen on the event.");
+ }
+ }
+ super.close();
+ LOG.info("ZkConnection {} was closed.", _monitorKey);
+ }
+
+ private void cleanupInactiveWatchers() {
+ Set<Watcher> closedWatchers = new HashSet<>();
+ for (Watcher watcher : _sharedWatchers) {
+ // TODO ideally, we shall have a ClosableWatcher interface so as to check accordingly. -- JJ
+ if (watcher instanceof SharedZkClient && ((SharedZkClient) watcher).isClosed()) {
+ closedWatchers.add(watcher);
+ }
+ }
+ _sharedWatchers.removeAll(closedWatchers);
+ }
+}