You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by hu...@apache.org on 2020/04/01 22:47:24 UTC
[helix] 29/49: Add FederatedZkClient (#789)
This is an automated email from the ASF dual-hosted git repository.
hulee pushed a commit to branch zooscalability
in repository https://gitbox.apache.org/repos/asf/helix.git
commit 4b28762b23f2d4ec81eca0f84c9a03c61e476cdb
Author: Huizhi Lu <ih...@gmail.com>
AuthorDate: Sat Feb 29 09:45:23 2020 -0800
Add FederatedZkClient (#789)
As part of ZkClient API enhancement, we wish to add FederatedZkClient, which is a wrapper of the raw ZkClient, that provides realm-aware access to ZooKeeper.
FederatedZkClient will internally maintain multiple ZooKeeper sessions connecting to different ZooKeeper realms on an as-needed basis and route requests to the appropriate ZooKeeper based on the ZK path sharding key. Ephemeral node creation is not supported.
---
.../zookeeper/impl/client/FederatedZkClient.java | 322 ++++++++++++++++-----
.../apache/helix/zookeeper/impl/ZkTestBase.java | 13 +-
.../impl/client/TestFederatedZkClient.java | 312 ++++++++++++++++++++
3 files changed, 573 insertions(+), 74 deletions(-)
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/FederatedZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/FederatedZkClient.java
index 3925a6d..5f63408 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/FederatedZkClient.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/FederatedZkClient.java
@@ -19,345 +19,533 @@ package org.apache.helix.zookeeper.impl.client;
* under the License.
*/
+import java.util.Iterator;
import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
+import org.apache.helix.msdcommon.datamodel.MetadataStoreRoutingData;
import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
+import org.apache.helix.zookeeper.impl.factory.DedicatedZkClientFactory;
import org.apache.helix.zookeeper.zkclient.DataUpdater;
import org.apache.helix.zookeeper.zkclient.IZkChildListener;
import org.apache.helix.zookeeper.zkclient.IZkDataListener;
+import org.apache.helix.zookeeper.zkclient.ZkConnection;
import org.apache.helix.zookeeper.zkclient.callback.ZkAsyncCallbacks;
-import org.apache.helix.zookeeper.zkclient.deprecated.IZkStateListener;
+import org.apache.helix.zookeeper.zkclient.IZkStateListener;
+import org.apache.helix.zookeeper.zkclient.serialize.BasicZkSerializer;
import org.apache.helix.zookeeper.zkclient.serialize.PathBasedZkSerializer;
import org.apache.helix.zookeeper.zkclient.serialize.ZkSerializer;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.Op;
import org.apache.zookeeper.OpResult;
+import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Implements and supports all ZK operations defined in interface {@link RealmAwareZkClient},
+ * except for session-aware operations such as creating ephemeral nodes, for which
+ * an {@link UnsupportedOperationException} will be thrown.
+ * <p>
+ * It acts as a single ZK client but will automatically route read/write/change subscription
+ * requests to the corresponding ZkClient with the help of metadata store directory service.
+ * It could connect to multiple ZK addresses and maintain a {@link ZkClient} for each ZK address.
+ * <p>
+ * Note: each Zk realm has its own event queue to handle listeners. So listeners from different ZK
+ * realms could be handled concurrently because listeners of a ZK realm are handled in its own
+ * queue. The concurrency of listeners should be aware of when implementing listeners for different
+ * ZK realms. The users should use thread-safe data structures if they wish to handle change
+ * callbacks.
+ */
+public class FederatedZkClient implements RealmAwareZkClient {
+ private static final Logger LOG = LoggerFactory.getLogger(FederatedZkClient.class);
+ private static final String FEDERATED_ZK_CLIENT = FederatedZkClient.class.getSimpleName();
+ private static final String DEDICATED_ZK_CLIENT_FACTORY =
+ DedicatedZkClientFactory.class.getSimpleName();
+
+ private final MetadataStoreRoutingData _metadataStoreRoutingData;
+ private final RealmAwareZkClient.RealmAwareZkClientConfig _clientConfig;
+
+ // ZK realm -> ZkClient
+ private final Map<String, ZkClient> _zkRealmToZkClientMap;
+
+ private volatile boolean _isClosed;
+ private PathBasedZkSerializer _pathBasedZkSerializer;
+
+ // TODO: support capacity of ZkClient number in one FederatedZkClient and do garbage collection.
+ public FederatedZkClient(RealmAwareZkClient.RealmAwareZkClientConfig clientConfig,
+ MetadataStoreRoutingData metadataStoreRoutingData) {
+ if (metadataStoreRoutingData == null) {
+ throw new IllegalArgumentException("MetadataStoreRoutingData cannot be null!");
+ }
+ if (clientConfig == null) {
+ throw new IllegalArgumentException("Client config cannot be null!");
+ }
+
+ _isClosed = false;
+ _clientConfig = clientConfig;
+ _pathBasedZkSerializer = clientConfig.getZkSerializer();
+ _metadataStoreRoutingData = metadataStoreRoutingData;
+ _zkRealmToZkClientMap = new ConcurrentHashMap<>();
+ }
-public class FederatedZkClient implements RealmAwareZkClient {
@Override
public List<String> subscribeChildChanges(String path, IZkChildListener listener) {
- return null;
+ return getZkClient(path).subscribeChildChanges(path, listener);
}
@Override
public void unsubscribeChildChanges(String path, IZkChildListener listener) {
-
+ getZkClient(path).unsubscribeChildChanges(path, listener);
}
@Override
public void subscribeDataChanges(String path, IZkDataListener listener) {
-
+ getZkClient(path).subscribeDataChanges(path, listener);
}
@Override
public void unsubscribeDataChanges(String path, IZkDataListener listener) {
-
+ getZkClient(path).unsubscribeDataChanges(path, listener);
}
@Override
public void subscribeStateChanges(IZkStateListener listener) {
-
+ throwUnsupportedOperationException();
}
@Override
public void unsubscribeStateChanges(IZkStateListener listener) {
+ throwUnsupportedOperationException();
+ }
+ @Override
+ public void subscribeStateChanges(
+ org.apache.helix.zookeeper.zkclient.deprecated.IZkStateListener listener) {
+ throwUnsupportedOperationException();
}
@Override
- public void unsubscribeAll() {
+ public void unsubscribeStateChanges(
+ org.apache.helix.zookeeper.zkclient.deprecated.IZkStateListener listener) {
+ throwUnsupportedOperationException();
+ }
+ @Override
+ public void unsubscribeAll() {
+ _zkRealmToZkClientMap.values().forEach(ZkClient::unsubscribeAll);
}
@Override
public void createPersistent(String path) {
-
+ createPersistent(path, false);
}
@Override
public void createPersistent(String path, boolean createParents) {
-
+ createPersistent(path, createParents, ZooDefs.Ids.OPEN_ACL_UNSAFE);
}
@Override
public void createPersistent(String path, boolean createParents, List<ACL> acl) {
-
+ getZkClient(path).createPersistent(path, createParents, acl);
}
@Override
public void createPersistent(String path, Object data) {
-
+ create(path, data, CreateMode.PERSISTENT);
}
@Override
public void createPersistent(String path, Object data, List<ACL> acl) {
-
+ create(path, data, acl, CreateMode.PERSISTENT);
}
@Override
public String createPersistentSequential(String path, Object data) {
- return null;
+ return create(path, data, CreateMode.PERSISTENT_SEQUENTIAL);
}
@Override
public String createPersistentSequential(String path, Object data, List<ACL> acl) {
- return null;
+ return create(path, data, acl, CreateMode.PERSISTENT_SEQUENTIAL);
}
@Override
public void createEphemeral(String path) {
-
+ create(path, null, CreateMode.EPHEMERAL);
}
@Override
public void createEphemeral(String path, String sessionId) {
-
+ createEphemeral(path, null, sessionId);
}
@Override
public void createEphemeral(String path, List<ACL> acl) {
-
+ create(path, null, acl, CreateMode.EPHEMERAL);
}
@Override
public void createEphemeral(String path, List<ACL> acl, String sessionId) {
-
+ create(path, null, acl, CreateMode.EPHEMERAL, sessionId);
}
@Override
public String create(String path, Object data, CreateMode mode) {
- return null;
+ return create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, mode);
}
@Override
- public String create(String path, Object datat, List<ACL> acl, CreateMode mode) {
- return null;
+ public String create(String path, Object data, List<ACL> acl, CreateMode mode) {
+ return create(path, data, acl, mode, null);
}
@Override
public void createEphemeral(String path, Object data) {
-
+ create(path, data, CreateMode.EPHEMERAL);
}
@Override
public void createEphemeral(String path, Object data, String sessionId) {
-
+ create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, sessionId);
}
@Override
public void createEphemeral(String path, Object data, List<ACL> acl) {
-
+ create(path, data, acl, CreateMode.EPHEMERAL);
}
@Override
public void createEphemeral(String path, Object data, List<ACL> acl, String sessionId) {
-
+ create(path, data, acl, CreateMode.EPHEMERAL, sessionId);
}
@Override
public String createEphemeralSequential(String path, Object data) {
- return null;
+ return create(path, data, CreateMode.EPHEMERAL_SEQUENTIAL);
}
@Override
public String createEphemeralSequential(String path, Object data, List<ACL> acl) {
- return null;
+ return create(path, data, acl, CreateMode.EPHEMERAL_SEQUENTIAL);
}
@Override
public String createEphemeralSequential(String path, Object data, String sessionId) {
- return null;
+ return create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL,
+ sessionId);
}
@Override
public String createEphemeralSequential(String path, Object data, List<ACL> acl,
String sessionId) {
- return null;
+ return create(path, data, acl, CreateMode.EPHEMERAL_SEQUENTIAL, sessionId);
}
@Override
public List<String> getChildren(String path) {
- return null;
+ return getZkClient(path).getChildren(path);
}
@Override
public int countChildren(String path) {
- return 0;
+ return getZkClient(path).countChildren(path);
}
@Override
public boolean exists(String path) {
- return false;
+ return getZkClient(path).exists(path);
}
@Override
public Stat getStat(String path) {
- return null;
+ return getZkClient(path).getStat(path);
}
@Override
public boolean waitUntilExists(String path, TimeUnit timeUnit, long time) {
- return false;
+ return getZkClient(path).waitUntilExists(path, timeUnit, time);
}
@Override
public void deleteRecursively(String path) {
-
+ getZkClient(path).deleteRecursively(path);
}
@Override
public boolean delete(String path) {
- return false;
+ return getZkClient(path).delete(path);
}
@Override
+ @SuppressWarnings("unchecked")
public <T> T readData(String path) {
- return null;
+ return (T) readData(path, false);
}
@Override
public <T> T readData(String path, boolean returnNullIfPathNotExists) {
- return null;
+ return getZkClient(path).readData(path, returnNullIfPathNotExists);
}
@Override
public <T> T readData(String path, Stat stat) {
- return null;
+ return getZkClient(path).readData(path, stat);
}
@Override
public <T> T readData(String path, Stat stat, boolean watch) {
- return null;
+ return getZkClient(path).readData(path, stat, watch);
}
@Override
public <T> T readDataAndStat(String path, Stat stat, boolean returnNullIfPathNotExists) {
- return null;
+ return getZkClient(path).readData(path, stat, returnNullIfPathNotExists);
}
@Override
public void writeData(String path, Object object) {
-
+ writeData(path, object, -1);
}
@Override
public <T> void updateDataSerialized(String path, DataUpdater<T> updater) {
-
+ getZkClient(path).updateDataSerialized(path, updater);
}
@Override
- public void writeData(String path, Object datat, int expectedVersion) {
-
+ public void writeData(String path, Object data, int expectedVersion) {
+ writeDataReturnStat(path, data, expectedVersion);
}
@Override
- public Stat writeDataReturnStat(String path, Object datat, int expectedVersion) {
- return null;
+ public Stat writeDataReturnStat(String path, Object data, int expectedVersion) {
+ return getZkClient(path).writeDataReturnStat(path, data, expectedVersion);
}
@Override
- public Stat writeDataGetStat(String path, Object datat, int expectedVersion) {
- return null;
+ public Stat writeDataGetStat(String path, Object data, int expectedVersion) {
+ return writeDataReturnStat(path, data, expectedVersion);
}
@Override
- public void asyncCreate(String path, Object datat, CreateMode mode,
+ public void asyncCreate(String path, Object data, CreateMode mode,
ZkAsyncCallbacks.CreateCallbackHandler cb) {
-
+ getZkClient(path).asyncCreate(path, data, mode, cb);
}
@Override
- public void asyncSetData(String path, Object datat, int version,
+ public void asyncSetData(String path, Object data, int version,
ZkAsyncCallbacks.SetDataCallbackHandler cb) {
-
+ getZkClient(path).asyncSetData(path, data, version, cb);
}
@Override
public void asyncGetData(String path, ZkAsyncCallbacks.GetDataCallbackHandler cb) {
-
+ getZkClient(path).asyncGetData(path, cb);
}
@Override
public void asyncExists(String path, ZkAsyncCallbacks.ExistsCallbackHandler cb) {
-
+ getZkClient(path).asyncExists(path, cb);
}
@Override
public void asyncDelete(String path, ZkAsyncCallbacks.DeleteCallbackHandler cb) {
-
+ getZkClient(path).asyncDelete(path, cb);
}
@Override
public void watchForData(String path) {
-
+ getZkClient(path).watchForData(path);
}
@Override
public List<String> watchForChilds(String path) {
- return null;
+ return getZkClient(path).watchForChilds(path);
}
@Override
public long getCreationTime(String path) {
- return 0;
+ return getZkClient(path).getCreationTime(path);
}
@Override
public List<OpResult> multi(Iterable<Op> ops) {
+ throwUnsupportedOperationException();
return null;
}
@Override
public boolean waitUntilConnected(long time, TimeUnit timeUnit) {
+ throwUnsupportedOperationException();
return false;
}
@Override
public String getServers() {
+ throwUnsupportedOperationException();
return null;
}
@Override
public long getSessionId() {
- return 0;
+ // Session-aware is unsupported.
+ throwUnsupportedOperationException();
+ return 0L;
}
@Override
public void close() {
+ if (isClosed()) {
+ return;
+ }
+ _isClosed = true;
+
+ synchronized (_zkRealmToZkClientMap) {
+ Iterator<Map.Entry<String, ZkClient>> iterator = _zkRealmToZkClientMap.entrySet().iterator();
+
+ while (iterator.hasNext()) {
+ Map.Entry<String, ZkClient> entry = iterator.next();
+ String zkRealm = entry.getKey();
+ ZkClient zkClient = entry.getValue();
+
+ // Catch any exception from ZkClient's close() to avoid that there is leakage of
+ // remaining unclosed ZkClient.
+ try {
+ zkClient.close();
+ } catch (Exception e) {
+ LOG.error("Exception thrown when closing ZkClient for ZkRealm: {}!", zkRealm, e);
+ }
+ iterator.remove();
+ }
+ }
+
+ LOG.info("{} is successfully closed.", FEDERATED_ZK_CLIENT);
}
@Override
public boolean isClosed() {
- return false;
+ return _isClosed;
}
@Override
public byte[] serialize(Object data, String path) {
- return new byte[0];
+ return getZkClient(path).serialize(data, path);
}
@Override
public <T> T deserialize(byte[] data, String path) {
- return null;
+ return getZkClient(path).deserialize(data, path);
}
@Override
public void setZkSerializer(ZkSerializer zkSerializer) {
-
+ _pathBasedZkSerializer = new BasicZkSerializer(zkSerializer);
+ _zkRealmToZkClientMap.values()
+ .forEach(zkClient -> zkClient.setZkSerializer(_pathBasedZkSerializer));
}
@Override
public void setZkSerializer(PathBasedZkSerializer zkSerializer) {
-
+ _pathBasedZkSerializer = zkSerializer;
+ _zkRealmToZkClientMap.values().forEach(zkClient -> zkClient.setZkSerializer(zkSerializer));
}
@Override
public PathBasedZkSerializer getZkSerializer() {
- return null;
+ return _pathBasedZkSerializer;
+ }
+
+ private String create(final String path, final Object dataObject, final List<ACL> acl,
+ final CreateMode mode, final String expectedSessionId) {
+ if (mode.isEphemeral()) {
+ throwUnsupportedOperationException();
+ }
+
+ // Create mode is not session-aware, so the node does not have to be created
+ // by the expectedSessionId.
+ return getZkClient(path).create(path, dataObject, acl, mode);
+ }
+
+ private ZkClient getZkClient(String path) {
+ // If FederatedZkClient is closed, should not return ZkClient.
+ checkClosedState();
+
+ String zkRealm = getZkRealm(path);
+
+ // Use this zkClient reference to protect the returning zkClient from being null because of
+ // race condition. Once we get the reference, even _zkRealmToZkClientMap is cleared by closed(),
+ // this zkClient is not null which guarantees the returned value not null.
+ ZkClient zkClient = _zkRealmToZkClientMap.get(zkRealm);
+
+ if (zkClient == null) {
+ // 1. Synchronized to avoid creating duplicate ZkClient for the same ZkRealm.
+ // 2. Synchronized with close() to avoid creating new ZkClient when all ZkClients are
+ // being closed and _zkRealmToZkClientMap is being cleared.
+ synchronized (_zkRealmToZkClientMap) {
+ // Because of potential race condition: thread B to get ZkClient could be blocked by this
+ // synchronized, while thread A is executing closed() in its synchronized block. So thread B
+ // could still enter this synchronized block once A completes executing closed() and
+ // releases the synchronized lock.
+ // Check closed state again to avoid creating a new ZkClient after FederatedZkClient
+ // is already closed.
+ checkClosedState();
+
+ if (!_zkRealmToZkClientMap.containsKey(zkRealm)) {
+ zkClient = createZkClient(zkRealm);
+ _zkRealmToZkClientMap.put(zkRealm, zkClient);
+ } else {
+ zkClient = _zkRealmToZkClientMap.get(zkRealm);
+ }
+ }
+ }
+
+ return zkClient;
+ }
+
+ private String getZkRealm(String path) {
+ String zkRealm;
+ try {
+ zkRealm = _metadataStoreRoutingData.getMetadataStoreRealm(path);
+ } catch (NoSuchElementException ex) {
+ throw new NoSuchElementException("Cannot find ZK realm for the path: " + path);
+ }
+
+ if (zkRealm == null || zkRealm.isEmpty()) {
+ throw new NoSuchElementException("Cannot find ZK realm for the path: " + path);
+ }
+
+ return zkRealm;
+ }
+
+ private ZkClient createZkClient(String zkAddress) {
+ LOG.debug("Creating ZkClient for realm: {}.", zkAddress);
+ return new ZkClient(new ZkConnection(zkAddress), (int) _clientConfig.getConnectInitTimeout(),
+ _clientConfig.getOperationRetryTimeout(), _pathBasedZkSerializer,
+ _clientConfig.getMonitorType(), _clientConfig.getMonitorKey(),
+ _clientConfig.getMonitorInstanceName(), _clientConfig.isMonitorRootPathOnly());
+ }
+
+ private void checkClosedState() {
+ if (isClosed()) {
+ throw new IllegalStateException(FEDERATED_ZK_CLIENT + " is closed!");
+ }
+ }
+
+ private void throwUnsupportedOperationException() {
+ throw new UnsupportedOperationException(
+ "Session-aware operation is not supported by " + FEDERATED_ZK_CLIENT
+ + ". Instead, please use " + DEDICATED_ZK_CLIENT_FACTORY
+ + " to create a dedicated RealmAwareZkClient for this operation.");
}
}
diff --git a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/ZkTestBase.java b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/ZkTestBase.java
index 10edaf4..7e59652 100644
--- a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/ZkTestBase.java
+++ b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/ZkTestBase.java
@@ -57,12 +57,11 @@ public class ZkTestBase {
* Multiple ZK references
*/
// The following maps hold ZK connect string as keys
- protected Map<String, ZkServer> _zkServerMap = new HashMap<>();
- protected int _numZk = 1; // Initial value
+ protected final Map<String, ZkServer> _zkServerMap = new HashMap<>();
+ protected static int _numZk = 1; // Initial value
@BeforeSuite
- public void beforeSuite()
- throws IOException {
+ public void beforeSuite() throws IOException {
// Due to ZOOKEEPER-2693 fix, we need to specify whitelist for execute zk commends
System.setProperty("zookeeper.4lw.commands.whitelist", "*");
@@ -80,8 +79,7 @@ public class ZkTestBase {
}
@AfterSuite
- public void afterSuite()
- throws IOException {
+ public void afterSuite() throws IOException {
// Clean up all JMX objects
for (ObjectName mbean : MBEAN_SERVER.queryNames(null, null)) {
try {
@@ -124,7 +122,7 @@ public class ZkTestBase {
* @param zkAddress
* @return
*/
- private ZkServer startZkServer(final String zkAddress) {
+ protected ZkServer startZkServer(final String zkAddress) {
String zkDir = zkAddress.replace(':', '_');
final String logDir = "/tmp/" + zkDir + "/logs";
final String dataDir = "/tmp/" + zkDir + "/dataDir";
@@ -142,6 +140,7 @@ public class ZkTestBase {
int port = Integer.parseInt(zkAddress.substring(zkAddress.lastIndexOf(':') + 1));
ZkServer zkServer = new ZkServer(dataDir, logDir, defaultNameSpace, port);
+ System.out.println("Starting ZK server at " + zkAddress);
zkServer.start();
return zkServer;
}
diff --git a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestFederatedZkClient.java b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestFederatedZkClient.java
new file mode 100644
index 0000000..5801690
--- /dev/null
+++ b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestFederatedZkClient.java
@@ -0,0 +1,312 @@
+package org.apache.helix.zookeeper.impl.client;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.helix.msdcommon.datamodel.TrieRoutingData;
+import org.apache.helix.msdcommon.exception.InvalidRoutingDataException;
+import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer;
+import org.apache.helix.zookeeper.impl.ZkTestBase;
+import org.apache.helix.zookeeper.zkclient.IZkStateListener;
+import org.apache.helix.zookeeper.zkclient.ZkServer;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.Op;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class TestFederatedZkClient extends ZkTestBase {
+ private static final String TEST_SHARDING_KEY_PREFIX = "/test_sharding_key_";
+ private static final String TEST_REALM_ONE_VALID_PATH = TEST_SHARDING_KEY_PREFIX + "1/a/b/c";
+ private static final String TEST_REALM_TWO_VALID_PATH = TEST_SHARDING_KEY_PREFIX + "2/x/y/z";
+ private static final String TEST_INVALID_PATH = TEST_SHARDING_KEY_PREFIX + "invalid/a/b/c";
+ private static final String UNSUPPORTED_OPERATION_MESSAGE =
+ "Session-aware operation is not supported by FederatedZkClient.";
+
+ private RealmAwareZkClient _realmAwareZkClient;
+ // Need to start an extra ZK server for multi-realm test, if only one ZK server is running.
+ private String _extraZkRealm;
+ private ZkServer _extraZkServer;
+
+ @BeforeClass
+ public void beforeClass() throws InvalidRoutingDataException {
+ System.out.println("Starting " + TestFederatedZkClient.class.getSimpleName());
+
+ // Populate rawRoutingData
+ // <Realm, List of sharding keys> Mapping
+ Map<String, List<String>> rawRoutingData = new HashMap<>();
+ for (int i = 0; i < _numZk; i++) {
+ List<String> shardingKeyList = Collections.singletonList(TEST_SHARDING_KEY_PREFIX + (i + 1));
+ String realmName = ZK_PREFIX + (ZK_START_PORT + i);
+ rawRoutingData.put(realmName, shardingKeyList);
+ }
+
+ if (rawRoutingData.size() < 2) {
+ System.out.println("There is only one ZK realm. Starting one more ZK to test multi-realm.");
+ _extraZkRealm = ZK_PREFIX + (ZK_START_PORT + 1);
+ _extraZkServer = startZkServer(_extraZkRealm);
+ // RealmTwo's sharding key: /test_sharding_key_2
+ List<String> shardingKeyList = Collections.singletonList(TEST_SHARDING_KEY_PREFIX + "2");
+ rawRoutingData.put(_extraZkRealm, shardingKeyList);
+ }
+
+ // Feed the raw routing data into TrieRoutingData to construct an in-memory representation
+ // of routing information.
+ _realmAwareZkClient = new FederatedZkClient(new RealmAwareZkClient.RealmAwareZkClientConfig(),
+ new TrieRoutingData(rawRoutingData));
+ }
+
+ @AfterClass
+ public void afterClass() {
+ // Close it as it is created in before class.
+ _realmAwareZkClient.close();
+
+ // Close the extra zk server.
+ if (_extraZkServer != null) {
+ _extraZkServer.shutdown();
+ }
+
+ System.out.println("Ending " + TestFederatedZkClient.class.getSimpleName());
+ }
+
+ /*
+ * Tests that an unsupported operation should throw an UnsupportedOperationException.
+ */
+ @Test
+ public void testUnsupportedOperations() {
+ // Test creating ephemeral.
+ try {
+ _realmAwareZkClient.create(TEST_REALM_ONE_VALID_PATH, "Hello", CreateMode.EPHEMERAL);
+ Assert.fail("Ephemeral node should not be created.");
+ } catch (UnsupportedOperationException ex) {
+ Assert.assertTrue(ex.getMessage().startsWith(UNSUPPORTED_OPERATION_MESSAGE));
+ }
+
+ // Test creating ephemeral sequential.
+ try {
+ _realmAwareZkClient
+ .create(TEST_REALM_ONE_VALID_PATH, "Hello", CreateMode.EPHEMERAL_SEQUENTIAL);
+ Assert.fail("Ephemeral node should not be created.");
+ } catch (UnsupportedOperationException ex) {
+ Assert.assertTrue(ex.getMessage().startsWith(UNSUPPORTED_OPERATION_MESSAGE));
+ }
+
+ List<Op> ops = Arrays.asList(
+ Op.create(TEST_REALM_ONE_VALID_PATH, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT), Op.delete(TEST_REALM_ONE_VALID_PATH, -1));
+ try {
+ _realmAwareZkClient.multi(ops);
+ Assert.fail("multi() should not be supported.");
+ } catch (UnsupportedOperationException ex) {
+ Assert.assertTrue(ex.getMessage().startsWith(UNSUPPORTED_OPERATION_MESSAGE));
+ }
+
+ try {
+ _realmAwareZkClient.getSessionId();
+ Assert.fail("getSessionId() should not be supported.");
+ } catch (UnsupportedOperationException ex) {
+ Assert.assertTrue(ex.getMessage().startsWith(UNSUPPORTED_OPERATION_MESSAGE));
+ }
+
+ try {
+ _realmAwareZkClient.getServers();
+ Assert.fail("getServers() should not be supported.");
+ } catch (UnsupportedOperationException ex) {
+ Assert.assertTrue(ex.getMessage().startsWith(UNSUPPORTED_OPERATION_MESSAGE));
+ }
+
+ try {
+ _realmAwareZkClient.waitUntilConnected(5L, TimeUnit.SECONDS);
+ Assert.fail("getServers() should not be supported.");
+ } catch (UnsupportedOperationException ex) {
+ Assert.assertTrue(ex.getMessage().startsWith(UNSUPPORTED_OPERATION_MESSAGE));
+ }
+
+ // Test state change subscription.
+ IZkStateListener listener = new IZkStateListener() {
+ @Override
+ public void handleStateChanged(Watcher.Event.KeeperState state) {
+ System.out.println("Handle new state: " + state);
+ }
+
+ @Override
+ public void handleNewSession(String sessionId) {
+ System.out.println("Handle new session: " + sessionId);
+ }
+
+ @Override
+ public void handleSessionEstablishmentError(Throwable error) {
+ System.out.println("Handle session establishment error: " + error);
+ }
+ };
+
+ try {
+ _realmAwareZkClient.subscribeStateChanges(listener);
+ Assert.fail("getServers() should not be supported.");
+ } catch (UnsupportedOperationException ex) {
+ Assert.assertTrue(ex.getMessage().startsWith(UNSUPPORTED_OPERATION_MESSAGE));
+ }
+
+ try {
+ _realmAwareZkClient.unsubscribeStateChanges(listener);
+ Assert.fail("getServers() should not be supported.");
+ } catch (UnsupportedOperationException ex) {
+ Assert.assertTrue(ex.getMessage().startsWith(UNSUPPORTED_OPERATION_MESSAGE));
+ }
+ }
+
+ /*
+ * Tests the persistent create() call against a valid path and an invalid path.
+ * Valid path is one that belongs to the realm designated by the sharding key.
+ * Invalid path is one that does not belong to the realm designated by the sharding key.
+ */
+ @Test(dependsOnMethods = "testUnsupportedOperations")
+ public void testCreatePersistent() {
+ _realmAwareZkClient.setZkSerializer(new ZNRecordSerializer());
+
+ // Create a dummy ZNRecord
+ ZNRecord znRecord = new ZNRecord("DummyRecord");
+ znRecord.setSimpleField("Dummy", "Value");
+
+ // Test writing and reading against the validPath
+ _realmAwareZkClient.createPersistent(TEST_REALM_ONE_VALID_PATH, true);
+ _realmAwareZkClient.writeData(TEST_REALM_ONE_VALID_PATH, znRecord);
+ Assert.assertEquals(_realmAwareZkClient.readData(TEST_REALM_ONE_VALID_PATH), znRecord);
+
+ // Test writing and reading against the invalid path
+ try {
+ _realmAwareZkClient.createPersistent(TEST_INVALID_PATH, true);
+ Assert.fail("Create() should not succeed on an invalid path!");
+ } catch (NoSuchElementException ex) {
+ Assert
+ .assertEquals(ex.getMessage(), "Cannot find ZK realm for the path: " + TEST_INVALID_PATH);
+ }
+ }
+
+ /*
+ * Tests that exists() works on valid path and fails on invalid path.
+ */
+ @Test(dependsOnMethods = "testCreatePersistent")
+ public void testExists() {
+ Assert.assertTrue(_realmAwareZkClient.exists(TEST_REALM_ONE_VALID_PATH));
+
+ try {
+ _realmAwareZkClient.exists(TEST_INVALID_PATH);
+ Assert.fail("Exists() should not succeed on an invalid path!");
+ } catch (NoSuchElementException ex) {
+ Assert
+ .assertEquals(ex.getMessage(), "Cannot find ZK realm for the path: " + TEST_INVALID_PATH);
+ }
+ }
+
+ /*
+ * Tests that delete() works on valid path and fails on invalid path.
+ */
+ @Test(dependsOnMethods = "testExists")
+ public void testDelete() {
+ try {
+ _realmAwareZkClient.delete(TEST_INVALID_PATH);
+ Assert.fail("Exists() should not succeed on an invalid path!");
+ } catch (NoSuchElementException ex) {
+ Assert
+ .assertEquals(ex.getMessage(), "Cannot find ZK realm for the path: " + TEST_INVALID_PATH);
+ }
+
+ Assert.assertTrue(_realmAwareZkClient.delete(TEST_REALM_ONE_VALID_PATH));
+ Assert.assertFalse(_realmAwareZkClient.exists(TEST_REALM_ONE_VALID_PATH));
+ }
+
+ /*
+ * Tests that multi-realm feature.
+ */
+ @Test(dependsOnMethods = "testDelete")
+ public void testMultiRealmCRUD() {
+ ZNRecord realmOneZnRecord = new ZNRecord("realmOne");
+ realmOneZnRecord.setSimpleField("realmOne", "Value");
+
+ ZNRecord realmTwoZnRecord = new ZNRecord("realmTwo");
+ realmTwoZnRecord.setSimpleField("realmTwo", "Value");
+
+ // Writing on realmOne.
+ _realmAwareZkClient.createPersistent(TEST_REALM_ONE_VALID_PATH, true);
+ _realmAwareZkClient.writeData(TEST_REALM_ONE_VALID_PATH, realmOneZnRecord);
+
+ // RealmOne path is created but realmTwo path is not.
+ Assert.assertTrue(_realmAwareZkClient.exists(TEST_REALM_ONE_VALID_PATH));
+ Assert.assertFalse(_realmAwareZkClient.exists(TEST_REALM_TWO_VALID_PATH));
+
+ // Writing on realmTwo.
+ _realmAwareZkClient.createPersistent(TEST_REALM_TWO_VALID_PATH, true);
+ _realmAwareZkClient.writeData(TEST_REALM_TWO_VALID_PATH, realmTwoZnRecord);
+
+ // RealmTwo path is created.
+ Assert.assertTrue(_realmAwareZkClient.exists(TEST_REALM_TWO_VALID_PATH));
+
+ // Reading on both realms.
+ Assert.assertEquals(_realmAwareZkClient.readData(TEST_REALM_ONE_VALID_PATH), realmOneZnRecord);
+ Assert.assertEquals(_realmAwareZkClient.readData(TEST_REALM_TWO_VALID_PATH), realmTwoZnRecord);
+
+ Assert.assertTrue(_realmAwareZkClient.delete(TEST_REALM_ONE_VALID_PATH));
+ Assert.assertFalse(_realmAwareZkClient.exists(TEST_REALM_ONE_VALID_PATH));
+
+ // Deleting on realmOne does not delete on realmTwo.
+ Assert.assertTrue(_realmAwareZkClient.exists(TEST_REALM_TWO_VALID_PATH));
+
+ // Deleting on realmTwo.
+ Assert.assertTrue(_realmAwareZkClient.delete(TEST_REALM_TWO_VALID_PATH));
+ Assert.assertFalse(_realmAwareZkClient.exists(TEST_REALM_TWO_VALID_PATH));
+ }
+
+ /*
+ * Tests that close() works.
+ * TODO: test that all raw zkClients are closed after FederatedZkClient close() is called. This
+ * could help avoid ZkClient leakage.
+ */
+ @Test(dependsOnMethods = "testMultiRealmCRUD")
+ public void testClose() {
+ Assert.assertFalse(_realmAwareZkClient.isClosed());
+
+ _realmAwareZkClient.close();
+
+ Assert.assertTrue(_realmAwareZkClient.isClosed());
+
+ // Client is closed, so operation should not be executed.
+ try {
+ _realmAwareZkClient.createPersistent(TEST_REALM_ONE_VALID_PATH);
+ Assert
+ .fail("createPersistent() should not be executed because RealmAwareZkClient is closed.");
+ } catch (IllegalStateException ex) {
+ Assert.assertEquals(ex.getMessage(), "FederatedZkClient is closed!");
+ }
+ }
+}