You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by xy...@apache.org on 2023/04/27 20:36:26 UTC

[helix] 17/37: Fix ZkClient retry logic for customized callback and test

This is an automated email from the ASF dual-hosted git repository.

xyuanlu pushed a commit to branch metaclient
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 57741e4c6dcb24277da3f7785ffad02d16e8a7fc
Author: xyuanlu <xy...@gmail.com>
AuthorDate: Mon Feb 6 22:27:01 2023 -0800

    Fix ZkClient retry logic for customized callback and test
    
    Fix ZkClient retry logic for customized callback and test
---
 .../zkclient/callback/ZkAsyncCallbacks.java        |  26 +++-
 .../impl/client/TestZkClientAsyncRetry.java        | 168 ++++++++++++++++++++-
 2 files changed, 191 insertions(+), 3 deletions(-)

diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/ZkAsyncCallbacks.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/ZkAsyncCallbacks.java
index 72e2b95b7..99d9719df 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/ZkAsyncCallbacks.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/ZkAsyncCallbacks.java
@@ -54,6 +54,14 @@ public class ZkAsyncCallbacks {
       callback(rc, path, ctx);
     }
 
+    public Stat getStat() {
+      return _stat;
+    }
+
+    public byte[] getData() {
+      return _data;
+    }
+
     @Override
     public void handle() {
       // TODO Auto-generated method stub
@@ -61,7 +69,7 @@ public class ZkAsyncCallbacks {
 
     @Override
     protected void recordFailure(int rc, String path, ZkAsyncCallMonitorContext monitor) {
-      if(rc != Code.NONODE.intValue()) {
+      if (rc != Code.NONODE.intValue()) {
         monitor.recordFailure(path);
       }
     }
@@ -99,6 +107,10 @@ public class ZkAsyncCallbacks {
       callback(rc, path, ctx);
     }
 
+    public Stat getStat() {
+      return _stat;
+    }
+
     @Override
     public void handle() {
       // TODO Auto-generated method stub
@@ -106,7 +118,7 @@ public class ZkAsyncCallbacks {
 
     @Override
     protected void recordFailure(int rc, String path, ZkAsyncCallMonitorContext monitor) {
-      if(rc != Code.NONODE.intValue()) {
+      if (rc != Code.NONODE.intValue()) {
         monitor.recordFailure(path);
       }
     }
@@ -182,6 +194,7 @@ public class ZkAsyncCallbacks {
   public static abstract class DefaultCallback implements CancellableZkAsyncCallback {
     AtomicBoolean _isOperationDone = new AtomicBoolean(false);
     int _rc = KeeperException.Code.APIERROR.intValue();
+    String _path;
 
     public void callback(int rc, String path, Object ctx) {
       if (rc != 0 && LOG.isDebugEnabled()) {
@@ -198,12 +211,14 @@ public class ZkAsyncCallbacks {
       }
 
       _rc = rc;
+      _path = path;
 
       // If retry is requested by passing the retry callback context, do retry if necessary.
       if (needRetry(rc)) {
         if (ctx != null && ctx instanceof ZkAsyncRetryCallContext) {
           try {
             if (((ZkAsyncRetryCallContext) ctx).requestRetry()) {
+              LOG.info("Received {} for async request on path {}, requested retry.", rc, path);
               // The retry operation will be done asynchronously. Once it is done, the same callback
               // handler object shall be triggered to ensure the result is notified to the right
               // caller(s).
@@ -225,6 +240,8 @@ public class ZkAsyncCallbacks {
       // If operation is done successfully or no retry needed, notify the caller(s).
       try {
         handle();
+      } catch (Exception ex) {
+        LOG.error("Exception while handling user callback for path {}.", _path, ex);
       } finally {
         markOperationDone();
       }
@@ -259,9 +276,14 @@ public class ZkAsyncCallbacks {
       return _rc;
     }
 
+    public String getPath() {
+      return _path;
+    }
+
     @Override
     public void notifyCallers() {
       LOG.warn("The callback {} has been cancelled.", this);
+      handle();
       markOperationDone();
     }
 
diff --git a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestZkClientAsyncRetry.java b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestZkClientAsyncRetry.java
index e55f8e6a9..ccbbde0d7 100644
--- a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestZkClientAsyncRetry.java
+++ b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestZkClientAsyncRetry.java
@@ -238,11 +238,89 @@ public class TestZkClientAsyncRetry extends ZkTestBase {
     }
   }
 
+  @Test(dependsOnMethods = "testAsyncWriteRetry")
+  public void testAsyncRetryCustomizedCallback() throws JMException {
+    // int array to store customized async callback return value. Initial value set to 100, witch
+    // not used by any ZK return code.
+    final int[] _returnCode = new int[2];
+    _returnCode[0] = 100;
+    _returnCode[1] = 100;
+
+    // Define Customized callback
+    class CustomizedSetCallback extends ZkAsyncCallbacks.SetDataCallbackHandler {
+      @Override
+      public void handle() {
+        _returnCode[0] = getRc();
+      }
+    }
+
+    MockAsyncZkClient testZkClient = new MockAsyncZkClient(_zkServerAddress);
+    try {
+      ZNRecord tmpRecord = new ZNRecord("tmpRecord");
+      tmpRecord.setSimpleField("foo", "bar");
+      testZkClient.createPersistent(NODE_PATH, tmpRecord);
+
+      // 1. Test async set retry
+      CustomizedSetCallback setCallback =
+          new CustomizedSetCallback();
+      Assert.assertEquals(setCallback.getRc(), KeeperException.Code.APIERROR.intValue());
+
+      tmpRecord.setSimpleField("test", "data");
+      testZkClient.setAsyncCallRC(CONNECTIONLOSS.intValue());
+      // Async set will be pending due to the mock error rc is retryable.
+      testZkClient.asyncSetData(NODE_PATH, tmpRecord, -1, setCallback);
+      Assert.assertFalse(setCallback.isOperationDone());
+      Assert.assertEquals(setCallback.getRc(), CONNECTIONLOSS.intValue());
+      // handle() haven't been called until retry finished or canceled, assert it is default value.
+      Assert.assertEquals(_returnCode[0], 100);
+      // Change the mock return code.
+      testZkClient.setAsyncCallRC(KeeperException.Code.OK.intValue());
+      // Async retry will succeed now. Wait until the operation is successfully done and verify.
+      Assert.assertTrue(waitAsyncOperation(setCallback, RETRY_OPS_WAIT_TIMEOUT_MS));
+      Assert.assertEquals(setCallback.getRc(), KeeperException.Code.OK.intValue());
+      // handle() called when retry finished, check return value.
+      Assert.assertEquals(_returnCode[0], KeeperException.Code.OK.intValue());
+      Assert.assertTrue(testZkClient.getAndResetRetryCount() >= 1);
+
+      // 2. Test async delete
+      class CustomizedDeleteCallback extends ZkAsyncCallbacks.DeleteCallbackHandler{
+        @Override
+        public void handle() {
+          _returnCode[1] = getRc();
+        }
+      }
+      CustomizedDeleteCallback deleteCallback =
+          new CustomizedDeleteCallback();
+      Assert.assertEquals(deleteCallback.getRc(), KeeperException.Code.APIERROR.intValue());
+
+      testZkClient.setAsyncCallRC(CONNECTIONLOSS.intValue());
+      // Async delete will be pending due to the mock error rc is retryable.
+      testZkClient.asyncDelete(NODE_PATH, deleteCallback);
+      Assert.assertFalse(deleteCallback.isOperationDone());
+      Assert.assertEquals(deleteCallback.getRc(), CONNECTIONLOSS.intValue());
+      // handle() haven't been called until retry finished or canceled, assert it is default value.
+      Assert.assertEquals(_returnCode[1], 100);
+      // Change the mock return code.
+      testZkClient.setAsyncCallRC(KeeperException.Code.OK.intValue());
+      // Async retry will succeed now. Wait until the operation is successfully done and verify.
+      Assert.assertTrue(waitAsyncOperation(deleteCallback, RETRY_OPS_WAIT_TIMEOUT_MS));
+      Assert.assertEquals(deleteCallback.getRc(), KeeperException.Code.OK.intValue());
+      Assert.assertFalse(testZkClient.exists(NODE_PATH));
+      Assert.assertTrue(testZkClient.getAndResetRetryCount() >= 1);
+      // handle() called when retry finished, check return value.
+      Assert.assertEquals(_returnCode[1], KeeperException.Code.OK.intValue());
+    } finally {
+      testZkClient.setAsyncCallRC(KeeperException.Code.OK.intValue());
+      testZkClient.close();
+      _zkClient.delete(NODE_PATH);
+    }
+  }
+
   /*
    * Tests if exception is thrown during retry operation,
    * the context should be cancelled correctly.
    */
-  @Test(dependsOnMethods = "testAsyncWriteRetry")
+  @Test(dependsOnMethods = "testAsyncRetryCustomizedCallback")
   public void testAsyncWriteRetryThrowException() throws JMException {
     MockAsyncZkClient testZkClient = new MockAsyncZkClient(_zkServerAddress);
     try {
@@ -305,7 +383,95 @@ public class TestZkClientAsyncRetry extends ZkTestBase {
     }
   }
 
+  /*
+   * Test handle() is executed once if callback retry is canceled.
+   */
   @Test(dependsOnMethods = "testAsyncWriteRetryThrowException")
+  public void testAsyncRetryCustomizedCallbackCancel() throws JMException {
+    // int array to store customized async callback return value. Initial value set to 100, witch
+    // not used by any ZK return code.
+    final int[] _returnCode = new int[2];
+    _returnCode[0] = 100;
+    _returnCode[1] = 100;
+
+    // Define Customized callback
+    class CustomizedCreateCallback extends ZkAsyncCallbacks.CreateCallbackHandler {
+      @Override
+      public void handle() {
+        _returnCode[0] = getRc();
+      }
+    }
+
+    MockAsyncZkClient testZkClient = new MockAsyncZkClient(_zkServerAddress);
+    try {
+      ZNRecord tmpRecord = new ZNRecord("tmpRecord");
+      tmpRecord.setSimpleField("foo", "bar");
+      testZkClient.createPersistent(NODE_PATH, tmpRecord);
+
+      // 1. Test async create retry
+      CustomizedCreateCallback createCallback =
+          new CustomizedCreateCallback();
+      Assert.assertEquals(createCallback.getRc(), KeeperException.Code.APIERROR.intValue());
+
+      tmpRecord.setSimpleField("test", "data");
+      testZkClient.setAsyncCallRC(CONNECTIONLOSS.intValue());
+      // Async set will be pending due to the mock error rc is retryable.
+      testZkClient.asyncCreate(NODE_PATH, tmpRecord, CreateMode.PERSISTENT, createCallback);
+      Assert.assertFalse(createCallback.isOperationDone());
+      // Original callback should have return code set to CONNECTIONLOSS
+      Assert.assertEquals(createCallback.getRc(), CONNECTIONLOSS.intValue());
+      // handle() haven't been called until retry finished or canceled, assert it is default value.
+      Assert.assertEquals(_returnCode[0], 100);
+      // Throw exception in retry
+      testZkClient.setZkExceptionInRetry(true);
+      // Async retry will succeed now. Wait until the operation is done and verify.
+      Assert.assertTrue(waitAsyncOperation(createCallback, RETRY_OPS_WAIT_TIMEOUT_MS),
+          "Async callback should have been canceled");
+      Assert.assertEquals(createCallback.getRc(), CONNECTIONLOSS.intValue());
+      Assert.assertEquals(_returnCode[0], CONNECTIONLOSS.intValue());
+      Assert.assertTrue(testZkClient.getAndResetRetryCount() >= 1);
+
+      // Restore the state
+      testZkClient.setZkExceptionInRetry(false);
+
+      class CustomizedSetCallback extends ZkAsyncCallbacks.SetDataCallbackHandler {
+        @Override
+        public void handle() {
+          _returnCode[1] = getRc();
+        }
+      }
+
+      // 1. Test async set retry
+      CustomizedSetCallback setCallback =
+          new CustomizedSetCallback();
+      Assert.assertEquals(setCallback.getRc(), KeeperException.Code.APIERROR.intValue());
+
+      tmpRecord.setSimpleField("test", "data");
+      testZkClient.setAsyncCallRC(CONNECTIONLOSS.intValue());
+      // Async set will be pending due to the mock error rc is retryable.
+      testZkClient.asyncSetData(NODE_PATH, tmpRecord, -1, setCallback);
+      Assert.assertFalse(setCallback.isOperationDone());
+      // Original callback should have return code set to CONNECTIONLOSS
+      Assert.assertEquals(createCallback.getRc(), CONNECTIONLOSS.intValue());
+      // handle() haven't been called until retry finished or canceled, assert it is default value.
+      Assert.assertEquals(_returnCode[1], 100);
+      // Throw exception in retry
+      testZkClient.setZkExceptionInRetry(true);
+      // Async retry will succeed now. Wait until the operation is done and verify.
+      Assert.assertTrue(waitAsyncOperation(setCallback, RETRY_OPS_WAIT_TIMEOUT_MS),
+          "Async callback should have been canceled");
+      Assert.assertEquals(setCallback.getRc(), CONNECTIONLOSS.intValue());
+      Assert.assertEquals(_returnCode[1], CONNECTIONLOSS.intValue());
+      Assert.assertTrue(testZkClient.getAndResetRetryCount() >= 1);
+
+    } finally {
+      testZkClient.setAsyncCallRC(KeeperException.Code.OK.intValue());
+      testZkClient.close();
+      _zkClient.delete(NODE_PATH);
+    }
+  }
+
+  @Test(dependsOnMethods = "testAsyncRetryCustomizedCallbackCancel")
   public void testAsyncReadRetry() throws JMException {
     MockAsyncZkClient testZkClient = new MockAsyncZkClient(_zkServerAddress);
     try {