You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by xy...@apache.org on 2023/02/22 22:33:27 UTC

[helix] branch metaclient updated: Implement zk Meta client async crud (#2354)

This is an automated email from the ASF dual-hosted git repository.

xyuanlu pushed a commit to branch metaclient
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/metaclient by this push:
     new 798ddbdf7 Implement zk Meta client async crud (#2354)
798ddbdf7 is described below

commit 798ddbdf718690e5786cb3506f4c309c6dcc63bc
Author: xyuanlu <xy...@gmail.com>
AuthorDate: Wed Feb 22 14:33:21 2023 -0800

    Implement zk Meta client async crud (#2354)
    
    Implement zk Meta client async crud
---
 .../apache/helix/metaclient/api/AsyncCallback.java |   2 +-
 .../helix/metaclient/api/MetaClientInterface.java  |  18 ++
 .../helix/metaclient/impl/zk/ZkMetaClient.java     |  88 +++++---
 .../zk/adapter/ZkMetaClientSetCallbackHandler.java |  42 ++++
 .../helix/metaclient/impl/zk/TestZkMetaClient.java |  67 +-----
 .../impl/zk/TestZkMetaClientAsyncOperations.java   | 233 +++++++++++++++++++++
 .../metaclient/impl/zk/ZkMetaClientTestBase.java   |  91 ++++++++
 7 files changed, 457 insertions(+), 84 deletions(-)

diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/api/AsyncCallback.java b/meta-client/src/main/java/org/apache/helix/metaclient/api/AsyncCallback.java
index 1e7c1eb75..ae7aed59b 100644
--- a/meta-client/src/main/java/org/apache/helix/metaclient/api/AsyncCallback.java
+++ b/meta-client/src/main/java/org/apache/helix/metaclient/api/AsyncCallback.java
@@ -28,8 +28,8 @@ import javax.annotation.Nullable;
  * The corresponding callback is registered when async CRUD API is invoked. Implementation processes
  * the result of each CRUD call. It should check return code and perform accordingly.
  */
-// TODO: define return code. failure code should map to MetaClient exceptions.
 public interface AsyncCallback {
+
   //This callback is used when stat object is returned from the operation.
   interface StatCallback extends AsyncCallback {
     /**
diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/api/MetaClientInterface.java b/meta-client/src/main/java/org/apache/helix/metaclient/api/MetaClientInterface.java
index a4c5113f2..af9a170b4 100644
--- a/meta-client/src/main/java/org/apache/helix/metaclient/api/MetaClientInterface.java
+++ b/meta-client/src/main/java/org/apache/helix/metaclient/api/MetaClientInterface.java
@@ -515,5 +515,23 @@ public interface MetaClientInterface<T> {
    */
   boolean waitUntilExists(String key, TimeUnit timeUnit, long timeOut);
 
+  /**
+   * Serialize the data in type T to a byte array. This function can be used in API that returns or
+   * has input value in byte array format.
+   * @param data to be serialized.
+   * @param path timeout unit
+   * @return
+   */
+   byte[] serialize(T data, String path);
+
+  /**
+   * Serialize a byte array to data in type T. This function can be used in API that returns or
+   * has input value in byte array format.
+   * @param bytes to be deserialized.
+   * @param path timeout unit
+   * @return
+   */
+   T deserialize(byte[] bytes, String path);
+
   // TODO: Secure CRUD APIs
 }
\ No newline at end of file
diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java
index 875eb6a50..f520e319b 100644
--- a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java
+++ b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java
@@ -36,6 +36,11 @@ import org.apache.helix.metaclient.api.OpResult;
 import org.apache.helix.metaclient.exception.MetaClientException;
 import org.apache.helix.metaclient.impl.zk.adapter.DataListenerAdapter;
 import org.apache.helix.metaclient.impl.zk.adapter.DirectChildListenerAdapter;
+import org.apache.helix.metaclient.impl.zk.adapter.ZkMetaClientCreateCallbackHandler;
+import org.apache.helix.metaclient.impl.zk.adapter.ZkMetaClientDeleteCallbackHandler;
+import org.apache.helix.metaclient.impl.zk.adapter.ZkMetaClientExistCallbackHandler;
+import org.apache.helix.metaclient.impl.zk.adapter.ZkMetaClientGetCallbackHandler;
+import org.apache.helix.metaclient.impl.zk.adapter.ZkMetaClientSetCallbackHandler;
 import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig;
 import org.apache.helix.metaclient.impl.zk.util.ZkMetaClientUtil;
 import org.apache.helix.zookeeper.api.client.ChildrenSubscribeResult;
@@ -51,7 +56,7 @@ import static org.apache.helix.metaclient.impl.zk.util.ZkMetaClientUtil.convertZ
 import static org.apache.helix.metaclient.impl.zk.util.ZkMetaClientUtil.translateZkExceptionToMetaclientException;
 
 public class ZkMetaClient<T> implements MetaClientInterface<T>, AutoCloseable {
-  private static final Logger LOG  = LoggerFactory.getLogger(ZkMetaClient.class);
+  private static final Logger LOG = LoggerFactory.getLogger(ZkMetaClient.class);
   private final ZkClient _zkClient;
   private final int _connectionTimeout;
 
@@ -93,7 +98,7 @@ public class ZkMetaClient<T> implements MetaClientInterface<T>, AutoCloseable {
   }
 
   @Override
-  public T update( String key, DataUpdater<T> updater) {
+  public T update(String key, DataUpdater<T> updater) {
     org.apache.zookeeper.data.Stat stat = new org.apache.zookeeper.data.Stat();
     // TODO: add retry logic for ZkBadVersionException.
     try {
@@ -161,63 +166,76 @@ public class ZkMetaClient<T> implements MetaClientInterface<T>, AutoCloseable {
   // thread. In our first version of implementation, we will keep similar behavior and have
   // callbacks executed in ZkClient event thread, and reuse zkclient retry logic.
 
-  // It is highly recommended NOT to perform any blocking operation inside the callbacks.
+  // It is highly recommended *NOT* to perform any blocking operation inside the callbacks.
   // If you block the thread the meta client won't process other events.
 
   // corresponding callbacks for each operation are invoked in order.
   @Override
   public void setAsyncExecPoolSize(int poolSize) {
-
+    throw new UnsupportedOperationException(
+        "All async calls are executed in a single thread to maintain sequence.");
   }
 
   @Override
   public void asyncCreate(String key, Object data, EntryMode mode, AsyncCallback.VoidCallback cb) {
-
-  }
-
-  @Override
-  public void asyncSet(String key, T data, int version, AsyncCallback.StatCallback cb) {
-
+    CreateMode entryMode;
+    try {
+      entryMode = ZkMetaClientUtil.convertMetaClientMode(mode);
+    } catch (ZkException | KeeperException e) {
+      throw new MetaClientException(e);
+    }
+    _zkClient.asyncCreate(key, data, entryMode,
+          new ZkMetaClientCreateCallbackHandler(cb));
   }
 
   @Override
-  public void asyncUpdate(String key, DataUpdater updater, AsyncCallback.DataCallback cb) {
-
+  public void asyncUpdate(String key, DataUpdater<T> updater, AsyncCallback.DataCallback cb) {
+    throw new NotImplementedException("Currently asyncUpdate is not supported in ZkMetaClient.");
+    /*
+     * TODO:  Only Helix has potential using this API as of now.  (ZkBaseDataAccessor.update())
+     *  Will move impl from ZkBaseDataAccessor to here when retiring ZkBaseDataAccessor.
+     */
   }
 
   @Override
   public void asyncGet(String key, AsyncCallback.DataCallback cb) {
-
+    _zkClient.asyncGetData(key,
+        new ZkMetaClientGetCallbackHandler(cb));
   }
 
   @Override
   public void asyncCountChildren(String key, AsyncCallback.DataCallback cb) {
+    throw new NotImplementedException(
+        "Currently asyncCountChildren is not supported in ZkMetaClient.");
+    /*
+     * TODO:  Only Helix has potential using this API as of now. (ZkBaseDataAccessor.getChildren())
+     *  Will move impl from ZkBaseDataAccessor to here when retiring ZkBaseDataAccessor.
+     */
 
   }
 
   @Override
   public void asyncExist(String key, AsyncCallback.StatCallback cb) {
-
+    _zkClient.asyncExists(key,
+        new ZkMetaClientExistCallbackHandler(cb));
   }
 
-  @Override
-  public void asyncDelete(String keys, AsyncCallback.VoidCallback cb) {
-
+  public void asyncDelete(String key, AsyncCallback.VoidCallback cb) {
+    _zkClient.asyncDelete(key, new ZkMetaClientDeleteCallbackHandler(cb));
   }
 
   @Override
-  public boolean[] create(List key, List data, List mode) {
-    return new boolean[0];
-  }
+  public void asyncTransaction(Iterable<Op> ops, AsyncCallback.TransactionCallback cb) {
+    throw new NotImplementedException(
+        "Currently asyncTransaction is not supported in ZkMetaClient.");
 
-  @Override
-  public boolean[] create(List key, List data) {
-    return new boolean[0];
+     //TODO: There is no active use case for Async transaction.
   }
 
   @Override
-  public void asyncTransaction(Iterable iterable, AsyncCallback.TransactionCallback cb) {
-
+  public void asyncSet(String key, T data, int version, AsyncCallback.StatCallback cb) {
+    _zkClient.asyncSetData(key, data, version,
+        new ZkMetaClientSetCallbackHandler(cb));
   }
 
   @Override
@@ -290,6 +308,16 @@ public class ZkMetaClient<T> implements MetaClientInterface<T>, AutoCloseable {
     return false;
   }
 
+  @Override
+  public boolean[] create(List key, List data, List mode) {
+    return new boolean[0];
+  }
+
+  @Override
+  public boolean[] create(List key, List data) {
+    return new boolean[0];
+  }
+
   @Override
   public boolean[] delete(List keys) {
     return new boolean[0];
@@ -329,4 +357,14 @@ public class ZkMetaClient<T> implements MetaClientInterface<T>, AutoCloseable {
     // Convert list of Zk OpResults to MetaClient OpResults
     return ZkMetaClientUtil.zkOpResultToMetaClientOpResults(zkResult);
   }
+
+  @Override
+  public byte[] serialize(T data, String path) {
+    return _zkClient.serialize(data, path);
+  }
+
+  @Override
+  public T deserialize(byte[] bytes, String path) {
+    return _zkClient.deserialize(bytes, path);
+  }
 }
diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/adapter/ZkMetaClientSetCallbackHandler.java b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/adapter/ZkMetaClientSetCallbackHandler.java
new file mode 100644
index 000000000..60eecc7f8
--- /dev/null
+++ b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/adapter/ZkMetaClientSetCallbackHandler.java
@@ -0,0 +1,42 @@
+package org.apache.helix.metaclient.impl.zk.adapter;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.metaclient.api.AsyncCallback;
+import org.apache.helix.metaclient.api.MetaClientInterface;
+import org.apache.helix.metaclient.impl.zk.util.ZkMetaClientUtil;
+import org.apache.helix.zookeeper.zkclient.callback.ZkAsyncCallbacks;
+
+
+public class ZkMetaClientSetCallbackHandler extends ZkAsyncCallbacks.SetDataCallbackHandler {
+  AsyncCallback.StatCallback userCallback;
+
+  public ZkMetaClientSetCallbackHandler(AsyncCallback.StatCallback cb) {
+    userCallback = cb;
+  }
+
+  @Override
+  public void handle() {
+    userCallback.processResult(getRc(), getPath(), getStat() == null ? null
+        : new MetaClientInterface.Stat(
+            ZkMetaClientUtil.convertZkEntryModeToMetaClientEntryMode(getStat().getEphemeralOwner()),
+            getStat().getVersion()));
+  }
+}
diff --git a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java
index 70a7afccd..f23ed0f03 100644
--- a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java
+++ b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java
@@ -27,6 +27,13 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
+import org.apache.helix.metaclient.api.DataUpdater;
+import org.apache.helix.metaclient.api.MetaClientInterface;
+import org.apache.helix.metaclient.exception.MetaClientException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
 import org.apache.commons.io.FileUtils;
 import org.apache.helix.metaclient.api.DataUpdater;
 import org.apache.helix.metaclient.api.DirectChildChangeListener;
@@ -55,7 +62,7 @@ import static org.apache.helix.metaclient.api.MetaClientInterface.EntryMode.CONT
 import static org.apache.helix.metaclient.api.MetaClientInterface.EntryMode.PERSISTENT;
 
 
-public class TestZkMetaClient {
+public class TestZkMetaClient extends ZkMetaClientTestBase{
 
   private static final String ZK_ADDR = "localhost:2183";
   private static final int DEFAULT_TIMEOUT_MS = 1000;
@@ -65,31 +72,6 @@ public class TestZkMetaClient {
 
   private final Object _syncObject = new Object();
 
-
-  private ZkServer _zkServer;
-
-  /**
-   * Creates local Zk Server
-   * Note: Cannot test container / TTL node end to end behavior as
-   * the zk server setup doesn't allow for that. To enable this, zk server
-   * setup must invoke ContainerManager.java. However, the actual
-   * behavior has been verified to work on native ZK Client.
-   * TODO: Modify zk server setup to include ContainerManager.
-   * This can be done through ZooKeeperServerMain.java or
-   * LeaderZooKeeperServer.java.
-   */
-  @BeforeClass
-  public void prepare() {
-    System.setProperty("zookeeper.extendedTypesEnabled", "true");
-    // start local zookeeper server
-    _zkServer = startZkServer(ZK_ADDR);
-  }
-
-  @AfterClass
-  public void cleanUp() {
-    _zkServer.shutdown();
-  }
-
   @Test
   public void testCreate() {
     final String key = "/TestZkMetaClient_testCreate";
@@ -115,7 +97,7 @@ public class TestZkMetaClient {
       Assert.assertNotNull(zkMetaClient.exists(key));
     }
   }
-  
+
   @Test
   public void testGet() {
     final String key = "/TestZkMetaClient_testGet";
@@ -352,37 +334,6 @@ public class TestZkMetaClient {
     }
   }
 
-  // TODO: Create a ZkMetadata test base class and move these helper to base class when more tests
-  // are added.
-  private static ZkMetaClient<String> createZkMetaClient() {
-    ZkMetaClientConfig config =
-        new ZkMetaClientConfig.ZkMetaClientConfigBuilder().setConnectionAddress(ZK_ADDR).build();
-    return new ZkMetaClient<>(config);
-  }
-
-  private static ZkServer startZkServer(final String zkAddress) {
-    String zkDir = zkAddress.replace(':', '_');
-    final String logDir = "/tmp/" + zkDir + "/logs";
-    final String dataDir = "/tmp/" + zkDir + "/dataDir";
-
-    // Clean up local directory
-    try {
-      FileUtils.deleteDirectory(new File(dataDir));
-      FileUtils.deleteDirectory(new File(logDir));
-    } catch (IOException e) {
-      e.printStackTrace();
-    }
-
-    IDefaultNameSpace defaultNameSpace = zkClient -> {
-    };
-
-    int port = Integer.parseInt(zkAddress.substring(zkAddress.lastIndexOf(':') + 1));
-    System.out.println("Starting ZK server at " + zkAddress);
-    ZkServer zkServer = new ZkServer(dataDir, logDir, defaultNameSpace, port);
-    zkServer.start();
-    return zkServer;
-  }
-
   /**
    * Transactional op calls zk.multi() with a set of ops (operations)
    * and the return values are converted into metaclient opResults.
diff --git a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClientAsyncOperations.java b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClientAsyncOperations.java
new file mode 100644
index 000000000..fbfd8ed35
--- /dev/null
+++ b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClientAsyncOperations.java
@@ -0,0 +1,233 @@
+package org.apache.helix.metaclient.impl.zk;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.annotation.Nullable;
+
+import org.apache.helix.metaclient.api.AsyncCallback;
+import org.apache.helix.metaclient.api.MetaClientInterface;
+import org.apache.zookeeper.KeeperException;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestZkMetaClientAsyncOperations extends ZkMetaClientTestBase {
+
+  static TestAsyncContext[] asyncContext = new TestAsyncContext[1];
+  static final String entryKey = "/TestAsyncEntryKey";
+  static final String nonExistsEntry = "/a/b/c";
+  static final long LATCH_WAIT_TIMEOUT_IN_S = 3 * 60;
+
+  static class TestAsyncContext {
+    int _asyncCallSize;
+    CountDownLatch _countDownLatch;
+    int[] _returnCode;
+    MetaClientInterface.Stat[] _stats;
+    String[] _data;
+
+    TestAsyncContext(int callSize) {
+      _asyncCallSize = callSize;
+      _countDownLatch = new CountDownLatch(callSize);
+      _returnCode = new int[callSize];
+      _stats = new MetaClientInterface.Stat[callSize];
+      _data = new String[callSize];
+    }
+
+    public CountDownLatch getCountDownLatch() {
+      return _countDownLatch;
+    }
+
+    public void countDown() {
+      _countDownLatch.countDown();
+    }
+
+    public int getReturnCode(int idx) {
+      return _returnCode[idx];
+    }
+
+    public MetaClientInterface.Stat getStats(int idx) {
+      return _stats[idx];
+    }
+
+    public String getData(int idx) {
+      return _data[idx];
+    }
+
+    public void setReturnCodeWhenFinished(int idx, int returnCode) {
+      _returnCode[idx] = returnCode;
+    }
+
+    public void setStatWhenFinished(int idx, MetaClientInterface.Stat stat) {
+      _stats[idx] = stat;
+    }
+
+    public void setDataWhenFinished(int idx, String data) {
+      _data[idx] = data;
+    }
+  }
+
+  @Test
+  public void testAsyncCreateSetAndGet() {
+    asyncContext[0] = new TestAsyncContext(2);
+    try (ZkMetaClient<String> zkMetaClient = createZkMetaClient()) {
+      zkMetaClient.connect();
+
+      zkMetaClient
+          .asyncCreate(entryKey, "async_create-data", MetaClientInterface.EntryMode.PERSISTENT,
+              new AsyncCallback.VoidCallback() {
+                @Override
+                public void processResult(int returnCode, String key) {
+                  asyncContext[0].setReturnCodeWhenFinished(0, returnCode);
+                  asyncContext[0].countDown();
+                }
+              });
+
+      zkMetaClient.asyncCreate(nonExistsEntry, "async_create-data-invalid",
+          MetaClientInterface.EntryMode.PERSISTENT, new AsyncCallback.VoidCallback() {
+            @Override
+            public void processResult(int returnCode, String key) {
+              asyncContext[0].setReturnCodeWhenFinished(1, returnCode);
+              asyncContext[0].countDown();
+            }
+          });
+
+      asyncContext[0].getCountDownLatch().await(LATCH_WAIT_TIMEOUT_IN_S, TimeUnit.SECONDS);
+
+      Assert.assertEquals(asyncContext[0].getReturnCode(0), KeeperException.Code.OK.intValue());
+      Assert.assertEquals(asyncContext[0].getReturnCode(1), KeeperException.Code.NONODE.intValue());
+
+      // create the entry again and expect a duplicated error code
+      asyncContext[0] = new TestAsyncContext(1);
+      zkMetaClient
+          .asyncCreate(entryKey, "async_create-data", MetaClientInterface.EntryMode.PERSISTENT,
+              new AsyncCallback.VoidCallback() {
+                @Override
+                public void processResult(int returnCode, String key) {
+                  asyncContext[0].setReturnCodeWhenFinished(0, returnCode);
+                  asyncContext[0].countDown();
+                }
+              });
+      asyncContext[0].getCountDownLatch().await(LATCH_WAIT_TIMEOUT_IN_S, TimeUnit.SECONDS);
+      Assert.assertEquals(asyncContext[0].getReturnCode(0),
+          KeeperException.Code.NODEEXISTS.intValue());
+
+
+      // test set
+      asyncContext[0] = new TestAsyncContext(1);
+      zkMetaClient
+          .asyncSet(entryKey, "async_create-data-new", 0,
+              new AsyncCallback.StatCallback() {
+                @Override
+                public void processResult(int returnCode, String key,
+                    @Nullable MetaClientInterface.Stat stat) {
+                  asyncContext[0].setReturnCodeWhenFinished(0, returnCode);
+                  asyncContext[0].setStatWhenFinished(0, stat);
+                  asyncContext[0].countDown();
+                }
+              });
+      asyncContext[0].getCountDownLatch().await(LATCH_WAIT_TIMEOUT_IN_S, TimeUnit.SECONDS);
+      Assert.assertEquals(asyncContext[0].getReturnCode(0),
+          KeeperException.Code.OK.intValue());
+      Assert.assertEquals(asyncContext[0].getStats(0).getEntryType(),
+          MetaClientInterface.EntryMode.PERSISTENT);
+      Assert.assertEquals(asyncContext[0].getStats(0).getVersion(), 1);
+
+      // test get
+      asyncContext[0] = new TestAsyncContext(1);
+      zkMetaClient.asyncGet(entryKey, new AsyncCallback.DataCallback() {
+        @Override
+        public void processResult(int returnCode, String key, byte[] data,
+            MetaClientInterface.Stat stat) {
+          asyncContext[0].setReturnCodeWhenFinished(0, returnCode);
+          asyncContext[0].setStatWhenFinished(0, stat);
+          asyncContext[0].setDataWhenFinished(0, zkMetaClient.deserialize(data, key));
+          asyncContext[0].countDown();
+        }
+      });
+
+      asyncContext[0].getCountDownLatch().await(LATCH_WAIT_TIMEOUT_IN_S, TimeUnit.SECONDS);
+
+      Assert.assertEquals(asyncContext[0].getReturnCode(0), KeeperException.Code.OK.intValue());
+      Assert.assertEquals(asyncContext[0].getStats(0).getEntryType(),
+          MetaClientInterface.EntryMode.PERSISTENT);
+      Assert.assertEquals(asyncContext[0].getStats(0).getVersion(), 1);
+      Assert.assertEquals(asyncContext[0].getData(0), "async_create-data-new");
+    } catch (Exception ex) {
+      Assert.fail("Test testAsyncCreate failed because of:", ex);
+    }
+  }
+
+  @Test(dependsOnMethods = "testAsyncCreateSetAndGet")
+  public void testAsyncExistsAndDelete() {
+    asyncContext[0] = new TestAsyncContext(2);
+    try (ZkMetaClient<String> zkMetaClient = createZkMetaClient()) {
+      zkMetaClient.connect();
+
+      zkMetaClient.asyncExist(entryKey, new AsyncCallback.StatCallback() {
+        @Override
+        public void processResult(int returnCode, String key, MetaClientInterface.Stat stat) {
+          asyncContext[0].setReturnCodeWhenFinished(0, returnCode);
+          asyncContext[0].setStatWhenFinished(0, stat);
+          asyncContext[0].countDown();
+        }
+      });
+
+      zkMetaClient.asyncExist(nonExistsEntry, new AsyncCallback.StatCallback() {
+        @Override
+        public void processResult(int returnCode, String key, MetaClientInterface.Stat stat) {
+          asyncContext[0].setReturnCodeWhenFinished(1, returnCode);
+          asyncContext[0].setStatWhenFinished(1, stat);
+          asyncContext[0].countDown();
+        }
+      });
+
+      asyncContext[0].getCountDownLatch().await(LATCH_WAIT_TIMEOUT_IN_S, TimeUnit.SECONDS);
+
+      Assert.assertEquals(asyncContext[0].getReturnCode(0), KeeperException.Code.OK.intValue());
+      Assert.assertEquals(asyncContext[0].getStats(0).getEntryType(),
+          MetaClientInterface.EntryMode.PERSISTENT);
+      Assert.assertEquals(asyncContext[0].getStats(0).getVersion(), 1);
+      Assert.assertEquals(asyncContext[0].getReturnCode(1), KeeperException.Code.NONODE.intValue());
+      Assert.assertNull(asyncContext[0].getStats(1));
+
+      // test delete
+      asyncContext[0] = new TestAsyncContext(1);
+      zkMetaClient.asyncDelete(entryKey, new AsyncCallback.VoidCallback() {
+        @Override
+        public void processResult(int returnCode, String key) {
+          asyncContext[0].setReturnCodeWhenFinished(0, returnCode);
+          asyncContext[0].countDown();
+        }
+      });
+
+      asyncContext[0].getCountDownLatch().await(LATCH_WAIT_TIMEOUT_IN_S, TimeUnit.SECONDS);
+
+      Assert.assertEquals(asyncContext[0].getReturnCode(0), KeeperException.Code.OK.intValue());
+
+      // node should not be there
+      Assert.assertNull(zkMetaClient.get(entryKey));
+    } catch (InterruptedException ex) {
+      Assert.fail("Test testAsyncCreate failed because of:", ex);
+    }
+  }
+}
diff --git a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/ZkMetaClientTestBase.java b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/ZkMetaClientTestBase.java
new file mode 100644
index 000000000..cbc9832e1
--- /dev/null
+++ b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/ZkMetaClientTestBase.java
@@ -0,0 +1,91 @@
+package org.apache.helix.metaclient.impl.zk;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig;
+import org.apache.helix.zookeeper.zkclient.IDefaultNameSpace;
+import org.apache.helix.zookeeper.zkclient.ZkServer;
+import org.testng.annotations.AfterSuite;
+import org.testng.annotations.BeforeSuite;
+
+
+public abstract class ZkMetaClientTestBase {
+
+  protected static final String ZK_ADDR = "localhost:2183";
+  protected static final int DEFAULT_TIMEOUT_MS = 1000;
+  protected static final String ENTRY_STRING_VALUE = "test-value";
+  private static ZkServer _zkServer;
+
+  /**
+   * Creates local Zk Server
+   * Note: Cannot test container / TTL node end to end behavior as
+   * the zk server setup doesn't allow for that. To enable this, zk server
+   * setup must invoke ContainerManager.java. However, the actual
+   * behavior has been verified to work on native ZK Client.
+   * TODO: Modify zk server setup to include ContainerManager.
+   * This can be done through ZooKeeperServerMain.java or
+   * LeaderZooKeeperServer.java.
+   */
+  @BeforeSuite
+  public void prepare() {
+    // start local zookeeper server
+    _zkServer = startZkServer(ZK_ADDR);
+  }
+
+  @AfterSuite
+  public void cleanUp() {
+    _zkServer.shutdown();
+  }
+
+  protected static ZkMetaClient<String> createZkMetaClient() {
+    ZkMetaClientConfig config =
+        new ZkMetaClientConfig.ZkMetaClientConfigBuilder().setConnectionAddress(ZK_ADDR)
+            //.setZkSerializer(new TestStringSerializer())
+            .build();
+    return new ZkMetaClient<>(config);
+  }
+
+  protected static ZkServer startZkServer(final String zkAddress) {
+    String zkDir = zkAddress.replace(':', '_');
+    final String logDir = "/tmp/" + zkDir + "/logs";
+    final String dataDir = "/tmp/" + zkDir + "/dataDir";
+
+    // Clean up local directory
+    try {
+      FileUtils.deleteDirectory(new File(dataDir));
+      FileUtils.deleteDirectory(new File(logDir));
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+
+    IDefaultNameSpace defaultNameSpace = zkClient -> {
+    };
+
+    int port = Integer.parseInt(zkAddress.substring(zkAddress.lastIndexOf(':') + 1));
+    System.out.println("Starting ZK server at " + zkAddress);
+    ZkServer zkServer = new ZkServer(dataDir, logDir, defaultNameSpace, port);
+    zkServer.start();
+    return zkServer;
+  }
+}