You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by xy...@apache.org on 2023/02/22 22:33:27 UTC
[helix] branch metaclient updated: Implement zk Meta client async crud (#2354)
This is an automated email from the ASF dual-hosted git repository.
xyuanlu pushed a commit to branch metaclient
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/metaclient by this push:
new 798ddbdf7 Implement zk Meta client async crud (#2354)
798ddbdf7 is described below
commit 798ddbdf718690e5786cb3506f4c309c6dcc63bc
Author: xyuanlu <xy...@gmail.com>
AuthorDate: Wed Feb 22 14:33:21 2023 -0800
Implement zk Meta client async crud (#2354)
Implement zk Meta client async crud
---
.../apache/helix/metaclient/api/AsyncCallback.java | 2 +-
.../helix/metaclient/api/MetaClientInterface.java | 18 ++
.../helix/metaclient/impl/zk/ZkMetaClient.java | 88 +++++---
.../zk/adapter/ZkMetaClientSetCallbackHandler.java | 42 ++++
.../helix/metaclient/impl/zk/TestZkMetaClient.java | 67 +-----
.../impl/zk/TestZkMetaClientAsyncOperations.java | 233 +++++++++++++++++++++
.../metaclient/impl/zk/ZkMetaClientTestBase.java | 91 ++++++++
7 files changed, 457 insertions(+), 84 deletions(-)
diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/api/AsyncCallback.java b/meta-client/src/main/java/org/apache/helix/metaclient/api/AsyncCallback.java
index 1e7c1eb75..ae7aed59b 100644
--- a/meta-client/src/main/java/org/apache/helix/metaclient/api/AsyncCallback.java
+++ b/meta-client/src/main/java/org/apache/helix/metaclient/api/AsyncCallback.java
@@ -28,8 +28,8 @@ import javax.annotation.Nullable;
* The corresponding callback is registered when async CRUD API is invoked. Implementation processes
* the result of each CRUD call. It should check return code and perform accordingly.
*/
-// TODO: define return code. failure code should map to MetaClient exceptions.
public interface AsyncCallback {
+
//This callback is used when stat object is returned from the operation.
interface StatCallback extends AsyncCallback {
/**
diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/api/MetaClientInterface.java b/meta-client/src/main/java/org/apache/helix/metaclient/api/MetaClientInterface.java
index a4c5113f2..af9a170b4 100644
--- a/meta-client/src/main/java/org/apache/helix/metaclient/api/MetaClientInterface.java
+++ b/meta-client/src/main/java/org/apache/helix/metaclient/api/MetaClientInterface.java
@@ -515,5 +515,23 @@ public interface MetaClientInterface<T> {
*/
boolean waitUntilExists(String key, TimeUnit timeUnit, long timeOut);
+ /**
+ * Serialize the data in type T to a byte array. This function can be used in API that returns or
+ * has input value in byte array format.
+ * @param data to be serialized.
+ * @param path timeout unit
+ * @return
+ */
+ byte[] serialize(T data, String path);
+
+ /**
+ * Serialize a byte array to data in type T. This function can be used in API that returns or
+ * has input value in byte array format.
+ * @param bytes to be deserialized.
+ * @param path timeout unit
+ * @return
+ */
+ T deserialize(byte[] bytes, String path);
+
// TODO: Secure CRUD APIs
}
\ No newline at end of file
diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java
index 875eb6a50..f520e319b 100644
--- a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java
+++ b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java
@@ -36,6 +36,11 @@ import org.apache.helix.metaclient.api.OpResult;
import org.apache.helix.metaclient.exception.MetaClientException;
import org.apache.helix.metaclient.impl.zk.adapter.DataListenerAdapter;
import org.apache.helix.metaclient.impl.zk.adapter.DirectChildListenerAdapter;
+import org.apache.helix.metaclient.impl.zk.adapter.ZkMetaClientCreateCallbackHandler;
+import org.apache.helix.metaclient.impl.zk.adapter.ZkMetaClientDeleteCallbackHandler;
+import org.apache.helix.metaclient.impl.zk.adapter.ZkMetaClientExistCallbackHandler;
+import org.apache.helix.metaclient.impl.zk.adapter.ZkMetaClientGetCallbackHandler;
+import org.apache.helix.metaclient.impl.zk.adapter.ZkMetaClientSetCallbackHandler;
import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig;
import org.apache.helix.metaclient.impl.zk.util.ZkMetaClientUtil;
import org.apache.helix.zookeeper.api.client.ChildrenSubscribeResult;
@@ -51,7 +56,7 @@ import static org.apache.helix.metaclient.impl.zk.util.ZkMetaClientUtil.convertZ
import static org.apache.helix.metaclient.impl.zk.util.ZkMetaClientUtil.translateZkExceptionToMetaclientException;
public class ZkMetaClient<T> implements MetaClientInterface<T>, AutoCloseable {
- private static final Logger LOG = LoggerFactory.getLogger(ZkMetaClient.class);
+ private static final Logger LOG = LoggerFactory.getLogger(ZkMetaClient.class);
private final ZkClient _zkClient;
private final int _connectionTimeout;
@@ -93,7 +98,7 @@ public class ZkMetaClient<T> implements MetaClientInterface<T>, AutoCloseable {
}
@Override
- public T update( String key, DataUpdater<T> updater) {
+ public T update(String key, DataUpdater<T> updater) {
org.apache.zookeeper.data.Stat stat = new org.apache.zookeeper.data.Stat();
// TODO: add retry logic for ZkBadVersionException.
try {
@@ -161,63 +166,76 @@ public class ZkMetaClient<T> implements MetaClientInterface<T>, AutoCloseable {
// thread. In our first version of implementation, we will keep similar behavior and have
// callbacks executed in ZkClient event thread, and reuse zkclient retry logic.
- // It is highly recommended NOT to perform any blocking operation inside the callbacks.
+ // It is highly recommended *NOT* to perform any blocking operation inside the callbacks.
// If you block the thread the meta client won't process other events.
// corresponding callbacks for each operation are invoked in order.
@Override
public void setAsyncExecPoolSize(int poolSize) {
-
+ throw new UnsupportedOperationException(
+ "All async calls are executed in a single thread to maintain sequence.");
}
@Override
public void asyncCreate(String key, Object data, EntryMode mode, AsyncCallback.VoidCallback cb) {
-
- }
-
- @Override
- public void asyncSet(String key, T data, int version, AsyncCallback.StatCallback cb) {
-
+ CreateMode entryMode;
+ try {
+ entryMode = ZkMetaClientUtil.convertMetaClientMode(mode);
+ } catch (ZkException | KeeperException e) {
+ throw new MetaClientException(e);
+ }
+ _zkClient.asyncCreate(key, data, entryMode,
+ new ZkMetaClientCreateCallbackHandler(cb));
}
@Override
- public void asyncUpdate(String key, DataUpdater updater, AsyncCallback.DataCallback cb) {
-
+ public void asyncUpdate(String key, DataUpdater<T> updater, AsyncCallback.DataCallback cb) {
+ throw new NotImplementedException("Currently asyncUpdate is not supported in ZkMetaClient.");
+ /*
+ * TODO: Only Helix has potential using this API as of now. (ZkBaseDataAccessor.update())
+ * Will move impl from ZkBaseDataAccessor to here when retiring ZkBaseDataAccessor.
+ */
}
@Override
public void asyncGet(String key, AsyncCallback.DataCallback cb) {
-
+ _zkClient.asyncGetData(key,
+ new ZkMetaClientGetCallbackHandler(cb));
}
@Override
public void asyncCountChildren(String key, AsyncCallback.DataCallback cb) {
+ throw new NotImplementedException(
+ "Currently asyncCountChildren is not supported in ZkMetaClient.");
+ /*
+ * TODO: Only Helix has potential using this API as of now. (ZkBaseDataAccessor.getChildren())
+ * Will move impl from ZkBaseDataAccessor to here when retiring ZkBaseDataAccessor.
+ */
}
@Override
public void asyncExist(String key, AsyncCallback.StatCallback cb) {
-
+ _zkClient.asyncExists(key,
+ new ZkMetaClientExistCallbackHandler(cb));
}
- @Override
- public void asyncDelete(String keys, AsyncCallback.VoidCallback cb) {
-
+ public void asyncDelete(String key, AsyncCallback.VoidCallback cb) {
+ _zkClient.asyncDelete(key, new ZkMetaClientDeleteCallbackHandler(cb));
}
@Override
- public boolean[] create(List key, List data, List mode) {
- return new boolean[0];
- }
+ public void asyncTransaction(Iterable<Op> ops, AsyncCallback.TransactionCallback cb) {
+ throw new NotImplementedException(
+ "Currently asyncTransaction is not supported in ZkMetaClient.");
- @Override
- public boolean[] create(List key, List data) {
- return new boolean[0];
+ //TODO: There is no active use case for Async transaction.
}
@Override
- public void asyncTransaction(Iterable iterable, AsyncCallback.TransactionCallback cb) {
-
+ public void asyncSet(String key, T data, int version, AsyncCallback.StatCallback cb) {
+ _zkClient.asyncSetData(key, data, version,
+ new ZkMetaClientSetCallbackHandler(cb));
}
@Override
@@ -290,6 +308,16 @@ public class ZkMetaClient<T> implements MetaClientInterface<T>, AutoCloseable {
return false;
}
+ @Override
+ public boolean[] create(List key, List data, List mode) {
+ return new boolean[0];
+ }
+
+ @Override
+ public boolean[] create(List key, List data) {
+ return new boolean[0];
+ }
+
@Override
public boolean[] delete(List keys) {
return new boolean[0];
@@ -329,4 +357,14 @@ public class ZkMetaClient<T> implements MetaClientInterface<T>, AutoCloseable {
// Convert list of Zk OpResults to MetaClient OpResults
return ZkMetaClientUtil.zkOpResultToMetaClientOpResults(zkResult);
}
+
+ @Override
+ public byte[] serialize(T data, String path) {
+ return _zkClient.serialize(data, path);
+ }
+
+ @Override
+ public T deserialize(byte[] bytes, String path) {
+ return _zkClient.deserialize(bytes, path);
+ }
}
diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/adapter/ZkMetaClientSetCallbackHandler.java b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/adapter/ZkMetaClientSetCallbackHandler.java
new file mode 100644
index 000000000..60eecc7f8
--- /dev/null
+++ b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/adapter/ZkMetaClientSetCallbackHandler.java
@@ -0,0 +1,42 @@
+package org.apache.helix.metaclient.impl.zk.adapter;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.metaclient.api.AsyncCallback;
+import org.apache.helix.metaclient.api.MetaClientInterface;
+import org.apache.helix.metaclient.impl.zk.util.ZkMetaClientUtil;
+import org.apache.helix.zookeeper.zkclient.callback.ZkAsyncCallbacks;
+
+
+public class ZkMetaClientSetCallbackHandler extends ZkAsyncCallbacks.SetDataCallbackHandler {
+ AsyncCallback.StatCallback userCallback;
+
+ public ZkMetaClientSetCallbackHandler(AsyncCallback.StatCallback cb) {
+ userCallback = cb;
+ }
+
+ @Override
+ public void handle() {
+ userCallback.processResult(getRc(), getPath(), getStat() == null ? null
+ : new MetaClientInterface.Stat(
+ ZkMetaClientUtil.convertZkEntryModeToMetaClientEntryMode(getStat().getEphemeralOwner()),
+ getStat().getVersion()));
+ }
+}
diff --git a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java
index 70a7afccd..f23ed0f03 100644
--- a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java
+++ b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java
@@ -27,6 +27,13 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import org.apache.helix.metaclient.api.DataUpdater;
+import org.apache.helix.metaclient.api.MetaClientInterface;
+import org.apache.helix.metaclient.exception.MetaClientException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
import org.apache.commons.io.FileUtils;
import org.apache.helix.metaclient.api.DataUpdater;
import org.apache.helix.metaclient.api.DirectChildChangeListener;
@@ -55,7 +62,7 @@ import static org.apache.helix.metaclient.api.MetaClientInterface.EntryMode.CONT
import static org.apache.helix.metaclient.api.MetaClientInterface.EntryMode.PERSISTENT;
-public class TestZkMetaClient {
+public class TestZkMetaClient extends ZkMetaClientTestBase{
private static final String ZK_ADDR = "localhost:2183";
private static final int DEFAULT_TIMEOUT_MS = 1000;
@@ -65,31 +72,6 @@ public class TestZkMetaClient {
private final Object _syncObject = new Object();
-
- private ZkServer _zkServer;
-
- /**
- * Creates local Zk Server
- * Note: Cannot test container / TTL node end to end behavior as
- * the zk server setup doesn't allow for that. To enable this, zk server
- * setup must invoke ContainerManager.java. However, the actual
- * behavior has been verified to work on native ZK Client.
- * TODO: Modify zk server setup to include ContainerManager.
- * This can be done through ZooKeeperServerMain.java or
- * LeaderZooKeeperServer.java.
- */
- @BeforeClass
- public void prepare() {
- System.setProperty("zookeeper.extendedTypesEnabled", "true");
- // start local zookeeper server
- _zkServer = startZkServer(ZK_ADDR);
- }
-
- @AfterClass
- public void cleanUp() {
- _zkServer.shutdown();
- }
-
@Test
public void testCreate() {
final String key = "/TestZkMetaClient_testCreate";
@@ -115,7 +97,7 @@ public class TestZkMetaClient {
Assert.assertNotNull(zkMetaClient.exists(key));
}
}
-
+
@Test
public void testGet() {
final String key = "/TestZkMetaClient_testGet";
@@ -352,37 +334,6 @@ public class TestZkMetaClient {
}
}
- // TODO: Create a ZkMetadata test base class and move these helper to base class when more tests
- // are added.
- private static ZkMetaClient<String> createZkMetaClient() {
- ZkMetaClientConfig config =
- new ZkMetaClientConfig.ZkMetaClientConfigBuilder().setConnectionAddress(ZK_ADDR).build();
- return new ZkMetaClient<>(config);
- }
-
- private static ZkServer startZkServer(final String zkAddress) {
- String zkDir = zkAddress.replace(':', '_');
- final String logDir = "/tmp/" + zkDir + "/logs";
- final String dataDir = "/tmp/" + zkDir + "/dataDir";
-
- // Clean up local directory
- try {
- FileUtils.deleteDirectory(new File(dataDir));
- FileUtils.deleteDirectory(new File(logDir));
- } catch (IOException e) {
- e.printStackTrace();
- }
-
- IDefaultNameSpace defaultNameSpace = zkClient -> {
- };
-
- int port = Integer.parseInt(zkAddress.substring(zkAddress.lastIndexOf(':') + 1));
- System.out.println("Starting ZK server at " + zkAddress);
- ZkServer zkServer = new ZkServer(dataDir, logDir, defaultNameSpace, port);
- zkServer.start();
- return zkServer;
- }
-
/**
* Transactional op calls zk.multi() with a set of ops (operations)
* and the return values are converted into metaclient opResults.
diff --git a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClientAsyncOperations.java b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClientAsyncOperations.java
new file mode 100644
index 000000000..fbfd8ed35
--- /dev/null
+++ b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClientAsyncOperations.java
@@ -0,0 +1,233 @@
+package org.apache.helix.metaclient.impl.zk;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.annotation.Nullable;
+
+import org.apache.helix.metaclient.api.AsyncCallback;
+import org.apache.helix.metaclient.api.MetaClientInterface;
+import org.apache.zookeeper.KeeperException;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestZkMetaClientAsyncOperations extends ZkMetaClientTestBase {
+
+ static TestAsyncContext[] asyncContext = new TestAsyncContext[1];
+ static final String entryKey = "/TestAsyncEntryKey";
+ static final String nonExistsEntry = "/a/b/c";
+ static final long LATCH_WAIT_TIMEOUT_IN_S = 3 * 60;
+
+ static class TestAsyncContext {
+ int _asyncCallSize;
+ CountDownLatch _countDownLatch;
+ int[] _returnCode;
+ MetaClientInterface.Stat[] _stats;
+ String[] _data;
+
+ TestAsyncContext(int callSize) {
+ _asyncCallSize = callSize;
+ _countDownLatch = new CountDownLatch(callSize);
+ _returnCode = new int[callSize];
+ _stats = new MetaClientInterface.Stat[callSize];
+ _data = new String[callSize];
+ }
+
+ public CountDownLatch getCountDownLatch() {
+ return _countDownLatch;
+ }
+
+ public void countDown() {
+ _countDownLatch.countDown();
+ }
+
+ public int getReturnCode(int idx) {
+ return _returnCode[idx];
+ }
+
+ public MetaClientInterface.Stat getStats(int idx) {
+ return _stats[idx];
+ }
+
+ public String getData(int idx) {
+ return _data[idx];
+ }
+
+ public void setReturnCodeWhenFinished(int idx, int returnCode) {
+ _returnCode[idx] = returnCode;
+ }
+
+ public void setStatWhenFinished(int idx, MetaClientInterface.Stat stat) {
+ _stats[idx] = stat;
+ }
+
+ public void setDataWhenFinished(int idx, String data) {
+ _data[idx] = data;
+ }
+ }
+
+ @Test
+ public void testAsyncCreateSetAndGet() {
+ asyncContext[0] = new TestAsyncContext(2);
+ try (ZkMetaClient<String> zkMetaClient = createZkMetaClient()) {
+ zkMetaClient.connect();
+
+ zkMetaClient
+ .asyncCreate(entryKey, "async_create-data", MetaClientInterface.EntryMode.PERSISTENT,
+ new AsyncCallback.VoidCallback() {
+ @Override
+ public void processResult(int returnCode, String key) {
+ asyncContext[0].setReturnCodeWhenFinished(0, returnCode);
+ asyncContext[0].countDown();
+ }
+ });
+
+ zkMetaClient.asyncCreate(nonExistsEntry, "async_create-data-invalid",
+ MetaClientInterface.EntryMode.PERSISTENT, new AsyncCallback.VoidCallback() {
+ @Override
+ public void processResult(int returnCode, String key) {
+ asyncContext[0].setReturnCodeWhenFinished(1, returnCode);
+ asyncContext[0].countDown();
+ }
+ });
+
+ asyncContext[0].getCountDownLatch().await(LATCH_WAIT_TIMEOUT_IN_S, TimeUnit.SECONDS);
+
+ Assert.assertEquals(asyncContext[0].getReturnCode(0), KeeperException.Code.OK.intValue());
+ Assert.assertEquals(asyncContext[0].getReturnCode(1), KeeperException.Code.NONODE.intValue());
+
+ // create the entry again and expect a duplicated error code
+ asyncContext[0] = new TestAsyncContext(1);
+ zkMetaClient
+ .asyncCreate(entryKey, "async_create-data", MetaClientInterface.EntryMode.PERSISTENT,
+ new AsyncCallback.VoidCallback() {
+ @Override
+ public void processResult(int returnCode, String key) {
+ asyncContext[0].setReturnCodeWhenFinished(0, returnCode);
+ asyncContext[0].countDown();
+ }
+ });
+ asyncContext[0].getCountDownLatch().await(LATCH_WAIT_TIMEOUT_IN_S, TimeUnit.SECONDS);
+ Assert.assertEquals(asyncContext[0].getReturnCode(0),
+ KeeperException.Code.NODEEXISTS.intValue());
+
+
+ // test set
+ asyncContext[0] = new TestAsyncContext(1);
+ zkMetaClient
+ .asyncSet(entryKey, "async_create-data-new", 0,
+ new AsyncCallback.StatCallback() {
+ @Override
+ public void processResult(int returnCode, String key,
+ @Nullable MetaClientInterface.Stat stat) {
+ asyncContext[0].setReturnCodeWhenFinished(0, returnCode);
+ asyncContext[0].setStatWhenFinished(0, stat);
+ asyncContext[0].countDown();
+ }
+ });
+ asyncContext[0].getCountDownLatch().await(LATCH_WAIT_TIMEOUT_IN_S, TimeUnit.SECONDS);
+ Assert.assertEquals(asyncContext[0].getReturnCode(0),
+ KeeperException.Code.OK.intValue());
+ Assert.assertEquals(asyncContext[0].getStats(0).getEntryType(),
+ MetaClientInterface.EntryMode.PERSISTENT);
+ Assert.assertEquals(asyncContext[0].getStats(0).getVersion(), 1);
+
+ // test get
+ asyncContext[0] = new TestAsyncContext(1);
+ zkMetaClient.asyncGet(entryKey, new AsyncCallback.DataCallback() {
+ @Override
+ public void processResult(int returnCode, String key, byte[] data,
+ MetaClientInterface.Stat stat) {
+ asyncContext[0].setReturnCodeWhenFinished(0, returnCode);
+ asyncContext[0].setStatWhenFinished(0, stat);
+ asyncContext[0].setDataWhenFinished(0, zkMetaClient.deserialize(data, key));
+ asyncContext[0].countDown();
+ }
+ });
+
+ asyncContext[0].getCountDownLatch().await(LATCH_WAIT_TIMEOUT_IN_S, TimeUnit.SECONDS);
+
+ Assert.assertEquals(asyncContext[0].getReturnCode(0), KeeperException.Code.OK.intValue());
+ Assert.assertEquals(asyncContext[0].getStats(0).getEntryType(),
+ MetaClientInterface.EntryMode.PERSISTENT);
+ Assert.assertEquals(asyncContext[0].getStats(0).getVersion(), 1);
+ Assert.assertEquals(asyncContext[0].getData(0), "async_create-data-new");
+ } catch (Exception ex) {
+ Assert.fail("Test testAsyncCreate failed because of:", ex);
+ }
+ }
+
+ @Test(dependsOnMethods = "testAsyncCreateSetAndGet")
+ public void testAsyncExistsAndDelete() {
+ asyncContext[0] = new TestAsyncContext(2);
+ try (ZkMetaClient<String> zkMetaClient = createZkMetaClient()) {
+ zkMetaClient.connect();
+
+ zkMetaClient.asyncExist(entryKey, new AsyncCallback.StatCallback() {
+ @Override
+ public void processResult(int returnCode, String key, MetaClientInterface.Stat stat) {
+ asyncContext[0].setReturnCodeWhenFinished(0, returnCode);
+ asyncContext[0].setStatWhenFinished(0, stat);
+ asyncContext[0].countDown();
+ }
+ });
+
+ zkMetaClient.asyncExist(nonExistsEntry, new AsyncCallback.StatCallback() {
+ @Override
+ public void processResult(int returnCode, String key, MetaClientInterface.Stat stat) {
+ asyncContext[0].setReturnCodeWhenFinished(1, returnCode);
+ asyncContext[0].setStatWhenFinished(1, stat);
+ asyncContext[0].countDown();
+ }
+ });
+
+ asyncContext[0].getCountDownLatch().await(LATCH_WAIT_TIMEOUT_IN_S, TimeUnit.SECONDS);
+
+ Assert.assertEquals(asyncContext[0].getReturnCode(0), KeeperException.Code.OK.intValue());
+ Assert.assertEquals(asyncContext[0].getStats(0).getEntryType(),
+ MetaClientInterface.EntryMode.PERSISTENT);
+ Assert.assertEquals(asyncContext[0].getStats(0).getVersion(), 1);
+ Assert.assertEquals(asyncContext[0].getReturnCode(1), KeeperException.Code.NONODE.intValue());
+ Assert.assertNull(asyncContext[0].getStats(1));
+
+ // test delete
+ asyncContext[0] = new TestAsyncContext(1);
+ zkMetaClient.asyncDelete(entryKey, new AsyncCallback.VoidCallback() {
+ @Override
+ public void processResult(int returnCode, String key) {
+ asyncContext[0].setReturnCodeWhenFinished(0, returnCode);
+ asyncContext[0].countDown();
+ }
+ });
+
+ asyncContext[0].getCountDownLatch().await(LATCH_WAIT_TIMEOUT_IN_S, TimeUnit.SECONDS);
+
+ Assert.assertEquals(asyncContext[0].getReturnCode(0), KeeperException.Code.OK.intValue());
+
+ // node should not be there
+ Assert.assertNull(zkMetaClient.get(entryKey));
+ } catch (InterruptedException ex) {
+ Assert.fail("Test testAsyncCreate failed because of:", ex);
+ }
+ }
+}
diff --git a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/ZkMetaClientTestBase.java b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/ZkMetaClientTestBase.java
new file mode 100644
index 000000000..cbc9832e1
--- /dev/null
+++ b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/ZkMetaClientTestBase.java
@@ -0,0 +1,91 @@
+package org.apache.helix.metaclient.impl.zk;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig;
+import org.apache.helix.zookeeper.zkclient.IDefaultNameSpace;
+import org.apache.helix.zookeeper.zkclient.ZkServer;
+import org.testng.annotations.AfterSuite;
+import org.testng.annotations.BeforeSuite;
+
+
+public abstract class ZkMetaClientTestBase {
+
+ protected static final String ZK_ADDR = "localhost:2183";
+ protected static final int DEFAULT_TIMEOUT_MS = 1000;
+ protected static final String ENTRY_STRING_VALUE = "test-value";
+ private static ZkServer _zkServer;
+
+ /**
+ * Creates local Zk Server
+ * Note: Cannot test container / TTL node end to end behavior as
+ * the zk server setup doesn't allow for that. To enable this, zk server
+ * setup must invoke ContainerManager.java. However, the actual
+ * behavior has been verified to work on native ZK Client.
+ * TODO: Modify zk server setup to include ContainerManager.
+ * This can be done through ZooKeeperServerMain.java or
+ * LeaderZooKeeperServer.java.
+ */
+ @BeforeSuite
+ public void prepare() {
+ // start local zookeeper server
+ _zkServer = startZkServer(ZK_ADDR);
+ }
+
+ @AfterSuite
+ public void cleanUp() {
+ _zkServer.shutdown();
+ }
+
+ protected static ZkMetaClient<String> createZkMetaClient() {
+ ZkMetaClientConfig config =
+ new ZkMetaClientConfig.ZkMetaClientConfigBuilder().setConnectionAddress(ZK_ADDR)
+ //.setZkSerializer(new TestStringSerializer())
+ .build();
+ return new ZkMetaClient<>(config);
+ }
+
+ protected static ZkServer startZkServer(final String zkAddress) {
+ String zkDir = zkAddress.replace(':', '_');
+ final String logDir = "/tmp/" + zkDir + "/logs";
+ final String dataDir = "/tmp/" + zkDir + "/dataDir";
+
+ // Clean up local directory
+ try {
+ FileUtils.deleteDirectory(new File(dataDir));
+ FileUtils.deleteDirectory(new File(logDir));
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+
+ IDefaultNameSpace defaultNameSpace = zkClient -> {
+ };
+
+ int port = Integer.parseInt(zkAddress.substring(zkAddress.lastIndexOf(':') + 1));
+ System.out.println("Starting ZK server at " + zkAddress);
+ ZkServer zkServer = new ZkServer(dataDir, logDir, defaultNameSpace, port);
+ zkServer.start();
+ return zkServer;
+ }
+}