You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by xy...@apache.org on 2023/04/27 20:36:26 UTC
[helix] 17/37: Fix ZkClient retry logic for customized callback and test
This is an automated email from the ASF dual-hosted git repository.
xyuanlu pushed a commit to branch metaclient
in repository https://gitbox.apache.org/repos/asf/helix.git
commit 57741e4c6dcb24277da3f7785ffad02d16e8a7fc
Author: xyuanlu <xy...@gmail.com>
AuthorDate: Mon Feb 6 22:27:01 2023 -0800
Fix ZkClient retry logic for customized callback and test
Fix ZkClient retry logic for customized callback and test
---
.../zkclient/callback/ZkAsyncCallbacks.java | 26 +++-
.../impl/client/TestZkClientAsyncRetry.java | 168 ++++++++++++++++++++-
2 files changed, 191 insertions(+), 3 deletions(-)
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/ZkAsyncCallbacks.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/ZkAsyncCallbacks.java
index 72e2b95b7..99d9719df 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/ZkAsyncCallbacks.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/ZkAsyncCallbacks.java
@@ -54,6 +54,14 @@ public class ZkAsyncCallbacks {
callback(rc, path, ctx);
}
+ public Stat getStat() {
+ return _stat;
+ }
+
+ public byte[] getData() {
+ return _data;
+ }
+
@Override
public void handle() {
// TODO Auto-generated method stub
@@ -61,7 +69,7 @@ public class ZkAsyncCallbacks {
@Override
protected void recordFailure(int rc, String path, ZkAsyncCallMonitorContext monitor) {
- if(rc != Code.NONODE.intValue()) {
+ if (rc != Code.NONODE.intValue()) {
monitor.recordFailure(path);
}
}
@@ -99,6 +107,10 @@ public class ZkAsyncCallbacks {
callback(rc, path, ctx);
}
+ public Stat getStat() {
+ return _stat;
+ }
+
@Override
public void handle() {
// TODO Auto-generated method stub
@@ -106,7 +118,7 @@ public class ZkAsyncCallbacks {
@Override
protected void recordFailure(int rc, String path, ZkAsyncCallMonitorContext monitor) {
- if(rc != Code.NONODE.intValue()) {
+ if (rc != Code.NONODE.intValue()) {
monitor.recordFailure(path);
}
}
@@ -182,6 +194,7 @@ public class ZkAsyncCallbacks {
public static abstract class DefaultCallback implements CancellableZkAsyncCallback {
AtomicBoolean _isOperationDone = new AtomicBoolean(false);
int _rc = KeeperException.Code.APIERROR.intValue();
+ String _path;
public void callback(int rc, String path, Object ctx) {
if (rc != 0 && LOG.isDebugEnabled()) {
@@ -198,12 +211,14 @@ public class ZkAsyncCallbacks {
}
_rc = rc;
+ _path = path;
// If retry is requested by passing the retry callback context, do retry if necessary.
if (needRetry(rc)) {
if (ctx != null && ctx instanceof ZkAsyncRetryCallContext) {
try {
if (((ZkAsyncRetryCallContext) ctx).requestRetry()) {
+ LOG.info("Received {} for async request on path {}, requested retry.", rc, path);
// The retry operation will be done asynchronously. Once it is done, the same callback
// handler object shall be triggered to ensure the result is notified to the right
// caller(s).
@@ -225,6 +240,8 @@ public class ZkAsyncCallbacks {
// If operation is done successfully or no retry needed, notify the caller(s).
try {
handle();
+ } catch (Exception ex) {
+ LOG.error("Exception while handling user callback for path {}.", _path, ex);
} finally {
markOperationDone();
}
@@ -259,9 +276,14 @@ public class ZkAsyncCallbacks {
return _rc;
}
+ public String getPath() {
+ return _path;
+ }
+
@Override
public void notifyCallers() {
LOG.warn("The callback {} has been cancelled.", this);
+ handle();
markOperationDone();
}
diff --git a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestZkClientAsyncRetry.java b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestZkClientAsyncRetry.java
index e55f8e6a9..ccbbde0d7 100644
--- a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestZkClientAsyncRetry.java
+++ b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestZkClientAsyncRetry.java
@@ -238,11 +238,89 @@ public class TestZkClientAsyncRetry extends ZkTestBase {
}
}
+ @Test(dependsOnMethods = "testAsyncWriteRetry")
+ public void testAsyncRetryCustomizedCallback() throws JMException {
+ // int array to store customized async callback return value. Initial value set to 100, witch
+ // not used by any ZK return code.
+ final int[] _returnCode = new int[2];
+ _returnCode[0] = 100;
+ _returnCode[1] = 100;
+
+ // Define Customized callback
+ class CustomizedSetCallback extends ZkAsyncCallbacks.SetDataCallbackHandler {
+ @Override
+ public void handle() {
+ _returnCode[0] = getRc();
+ }
+ }
+
+ MockAsyncZkClient testZkClient = new MockAsyncZkClient(_zkServerAddress);
+ try {
+ ZNRecord tmpRecord = new ZNRecord("tmpRecord");
+ tmpRecord.setSimpleField("foo", "bar");
+ testZkClient.createPersistent(NODE_PATH, tmpRecord);
+
+ // 1. Test async set retry
+ CustomizedSetCallback setCallback =
+ new CustomizedSetCallback();
+ Assert.assertEquals(setCallback.getRc(), KeeperException.Code.APIERROR.intValue());
+
+ tmpRecord.setSimpleField("test", "data");
+ testZkClient.setAsyncCallRC(CONNECTIONLOSS.intValue());
+ // Async set will be pending due to the mock error rc is retryable.
+ testZkClient.asyncSetData(NODE_PATH, tmpRecord, -1, setCallback);
+ Assert.assertFalse(setCallback.isOperationDone());
+ Assert.assertEquals(setCallback.getRc(), CONNECTIONLOSS.intValue());
+ // handle() haven't been called until retry finished or canceled, assert it is default value.
+ Assert.assertEquals(_returnCode[0], 100);
+ // Change the mock return code.
+ testZkClient.setAsyncCallRC(KeeperException.Code.OK.intValue());
+ // Async retry will succeed now. Wait until the operation is successfully done and verify.
+ Assert.assertTrue(waitAsyncOperation(setCallback, RETRY_OPS_WAIT_TIMEOUT_MS));
+ Assert.assertEquals(setCallback.getRc(), KeeperException.Code.OK.intValue());
+ // handle() called when retry finished, check return value.
+ Assert.assertEquals(_returnCode[0], KeeperException.Code.OK.intValue());
+ Assert.assertTrue(testZkClient.getAndResetRetryCount() >= 1);
+
+ // 2. Test async delete
+ class CustomizedDeleteCallback extends ZkAsyncCallbacks.DeleteCallbackHandler{
+ @Override
+ public void handle() {
+ _returnCode[1] = getRc();
+ }
+ }
+ CustomizedDeleteCallback deleteCallback =
+ new CustomizedDeleteCallback();
+ Assert.assertEquals(deleteCallback.getRc(), KeeperException.Code.APIERROR.intValue());
+
+ testZkClient.setAsyncCallRC(CONNECTIONLOSS.intValue());
+ // Async delete will be pending due to the mock error rc is retryable.
+ testZkClient.asyncDelete(NODE_PATH, deleteCallback);
+ Assert.assertFalse(deleteCallback.isOperationDone());
+ Assert.assertEquals(deleteCallback.getRc(), CONNECTIONLOSS.intValue());
+ // handle() haven't been called until retry finished or canceled, assert it is default value.
+ Assert.assertEquals(_returnCode[1], 100);
+ // Change the mock return code.
+ testZkClient.setAsyncCallRC(KeeperException.Code.OK.intValue());
+ // Async retry will succeed now. Wait until the operation is successfully done and verify.
+ Assert.assertTrue(waitAsyncOperation(deleteCallback, RETRY_OPS_WAIT_TIMEOUT_MS));
+ Assert.assertEquals(deleteCallback.getRc(), KeeperException.Code.OK.intValue());
+ Assert.assertFalse(testZkClient.exists(NODE_PATH));
+ Assert.assertTrue(testZkClient.getAndResetRetryCount() >= 1);
+ // handle() called when retry finished, check return value.
+ Assert.assertEquals(_returnCode[1], KeeperException.Code.OK.intValue());
+ } finally {
+ testZkClient.setAsyncCallRC(KeeperException.Code.OK.intValue());
+ testZkClient.close();
+ _zkClient.delete(NODE_PATH);
+ }
+ }
+
/*
* Tests if exception is thrown during retry operation,
* the context should be cancelled correctly.
*/
- @Test(dependsOnMethods = "testAsyncWriteRetry")
+ @Test(dependsOnMethods = "testAsyncRetryCustomizedCallback")
public void testAsyncWriteRetryThrowException() throws JMException {
MockAsyncZkClient testZkClient = new MockAsyncZkClient(_zkServerAddress);
try {
@@ -305,7 +383,95 @@ public class TestZkClientAsyncRetry extends ZkTestBase {
}
}
+ /*
+ * Test handle() is executed once if callback retry is canceled.
+ */
@Test(dependsOnMethods = "testAsyncWriteRetryThrowException")
+ public void testAsyncRetryCustomizedCallbackCancel() throws JMException {
+ // int array to store customized async callback return value. Initial value set to 100, witch
+ // not used by any ZK return code.
+ final int[] _returnCode = new int[2];
+ _returnCode[0] = 100;
+ _returnCode[1] = 100;
+
+ // Define Customized callback
+ class CustomizedCreateCallback extends ZkAsyncCallbacks.CreateCallbackHandler {
+ @Override
+ public void handle() {
+ _returnCode[0] = getRc();
+ }
+ }
+
+ MockAsyncZkClient testZkClient = new MockAsyncZkClient(_zkServerAddress);
+ try {
+ ZNRecord tmpRecord = new ZNRecord("tmpRecord");
+ tmpRecord.setSimpleField("foo", "bar");
+ testZkClient.createPersistent(NODE_PATH, tmpRecord);
+
+ // 1. Test async create retry
+ CustomizedCreateCallback createCallback =
+ new CustomizedCreateCallback();
+ Assert.assertEquals(createCallback.getRc(), KeeperException.Code.APIERROR.intValue());
+
+ tmpRecord.setSimpleField("test", "data");
+ testZkClient.setAsyncCallRC(CONNECTIONLOSS.intValue());
+ // Async set will be pending due to the mock error rc is retryable.
+ testZkClient.asyncCreate(NODE_PATH, tmpRecord, CreateMode.PERSISTENT, createCallback);
+ Assert.assertFalse(createCallback.isOperationDone());
+ // Original callback should have return code set to CONNECTIONLOSS
+ Assert.assertEquals(createCallback.getRc(), CONNECTIONLOSS.intValue());
+ // handle() haven't been called until retry finished or canceled, assert it is default value.
+ Assert.assertEquals(_returnCode[0], 100);
+ // Throw exception in retry
+ testZkClient.setZkExceptionInRetry(true);
+ // Async retry will succeed now. Wait until the operation is done and verify.
+ Assert.assertTrue(waitAsyncOperation(createCallback, RETRY_OPS_WAIT_TIMEOUT_MS),
+ "Async callback should have been canceled");
+ Assert.assertEquals(createCallback.getRc(), CONNECTIONLOSS.intValue());
+ Assert.assertEquals(_returnCode[0], CONNECTIONLOSS.intValue());
+ Assert.assertTrue(testZkClient.getAndResetRetryCount() >= 1);
+
+ // Restore the state
+ testZkClient.setZkExceptionInRetry(false);
+
+ class CustomizedSetCallback extends ZkAsyncCallbacks.SetDataCallbackHandler {
+ @Override
+ public void handle() {
+ _returnCode[1] = getRc();
+ }
+ }
+
+ // 1. Test async set retry
+ CustomizedSetCallback setCallback =
+ new CustomizedSetCallback();
+ Assert.assertEquals(setCallback.getRc(), KeeperException.Code.APIERROR.intValue());
+
+ tmpRecord.setSimpleField("test", "data");
+ testZkClient.setAsyncCallRC(CONNECTIONLOSS.intValue());
+ // Async set will be pending due to the mock error rc is retryable.
+ testZkClient.asyncSetData(NODE_PATH, tmpRecord, -1, setCallback);
+ Assert.assertFalse(setCallback.isOperationDone());
+ // Original callback should have return code set to CONNECTIONLOSS
+ Assert.assertEquals(createCallback.getRc(), CONNECTIONLOSS.intValue());
+ // handle() haven't been called until retry finished or canceled, assert it is default value.
+ Assert.assertEquals(_returnCode[1], 100);
+ // Throw exception in retry
+ testZkClient.setZkExceptionInRetry(true);
+ // Async retry will succeed now. Wait until the operation is done and verify.
+ Assert.assertTrue(waitAsyncOperation(setCallback, RETRY_OPS_WAIT_TIMEOUT_MS),
+ "Async callback should have been canceled");
+ Assert.assertEquals(setCallback.getRc(), CONNECTIONLOSS.intValue());
+ Assert.assertEquals(_returnCode[1], CONNECTIONLOSS.intValue());
+ Assert.assertTrue(testZkClient.getAndResetRetryCount() >= 1);
+
+ } finally {
+ testZkClient.setAsyncCallRC(KeeperException.Code.OK.intValue());
+ testZkClient.close();
+ _zkClient.delete(NODE_PATH);
+ }
+ }
+
+ @Test(dependsOnMethods = "testAsyncRetryCustomizedCallbackCancel")
public void testAsyncReadRetry() throws JMException {
MockAsyncZkClient testZkClient = new MockAsyncZkClient(_zkServerAddress);
try {