You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ji...@apache.org on 2020/03/02 19:57:34 UTC

[helix] branch async created (now 9685c17)

This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a change to branch async
in repository https://gitbox.apache.org/repos/asf/helix.git.


      at 9685c17  Async write operation should not throw Exception for serializing error.

This branch includes the following new commits:

     new 9685c17  Async write operation should not throw Exception for serializing error.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[helix] 01/01: Async write operation should not throw Exception for serializing error.

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a commit to branch async
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 9685c1705832d1fe99d8f6b53b4da465c07f8a0d
Author: Jiajun Wang <jj...@linkedin.com>
AuthorDate: Tue Feb 25 14:56:01 2020 -0800

    Async write operation should not throw Exception for serializing error.
    
    This change will make the async write operations return error through the async callback instead of throwing exceptions. This change will fix the batch write/create failure due to one single node serializing failure.
---
 .../apache/helix/manager/zk/TestRawZkClient.java   | 52 ++++++++++++++++++
 .../apache/helix/zookeeper/zkclient/ZkClient.java  | 61 +++++++++++++---------
 2 files changed, 88 insertions(+), 25 deletions(-)

diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestRawZkClient.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestRawZkClient.java
index a713996..5d60af0 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestRawZkClient.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestRawZkClient.java
@@ -22,6 +22,7 @@ package org.apache.helix.manager.zk;
 import java.lang.management.ManagementFactory;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Random;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -41,6 +42,8 @@ import org.apache.helix.monitoring.mbeans.MBeanRegistrar;
 import org.apache.helix.monitoring.mbeans.MonitorDomainNames;
 import org.apache.helix.monitoring.mbeans.ZkClientMonitor;
 import org.apache.helix.monitoring.mbeans.ZkClientPathMonitor;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer;
 import org.apache.helix.zookeeper.zkclient.IZkDataListener;
 import org.apache.helix.zookeeper.zkclient.IZkStateListener;
 import org.apache.helix.zookeeper.zkclient.ZkConnection;
@@ -745,4 +748,53 @@ public class TestRawZkClient extends ZkUnitTestBase {
     // Recover zk server for later tests.
     _zkServer.start();
   }
+
+  @Test
+  public void testAsyncWriteOperations() {
+    ZkClient zkClient = new ZkClient(ZK_ADDR);
+    // TODO disable the compress here
+    zkClient.setZkSerializer(new ZNRecordSerializer());
+
+    ZNRecord oversizeZNRecord = new ZNRecord("Oversize");
+    StringBuilder sb = new StringBuilder(1204);
+    Random ran = new Random();
+    for (int i = 0; i < 1024; i++) {
+      sb.append(ran.nextInt(26) + 'a');
+    }
+    String buf = sb.toString();
+    for (int i = 0; i < 1024; i++) {
+      oversizeZNRecord.setSimpleField(Integer.toString(i), buf);
+    }
+
+    // ensure /tmp exists for the test
+    if (!zkClient.exists("/tmp")) {
+      zkClient.create("/tmp", null, CreateMode.PERSISTENT);
+    }
+
+    org.apache.helix.zookeeper.zkclient.callback.ZkAsyncCallbacks.CreateCallbackHandler
+        createCallback =
+        new org.apache.helix.zookeeper.zkclient.callback.ZkAsyncCallbacks.CreateCallbackHandler();
+    zkClient.asyncCreate( "/tmp/async", null, CreateMode.PERSISTENT, createCallback);
+    createCallback.waitForSuccess();
+    Assert.assertEquals(createCallback.getRc(), 0);
+
+    // try to create oversize node, should fail
+    zkClient.asyncCreate( "/tmp/asyncOversize", oversizeZNRecord, CreateMode.PERSISTENT, createCallback);
+    createCallback.waitForSuccess();
+    Assert.assertEquals(createCallback.getRc(), KeeperException.Code.MarshallingError);
+
+    ZNRecord normalZNRecord = new ZNRecord("normal");
+    normalZNRecord.setSimpleField("key", buf);
+
+    org.apache.helix.zookeeper.zkclient.callback.ZkAsyncCallbacks.SetDataCallbackHandler
+        setDataCallbackHandler =
+        new org.apache.helix.zookeeper.zkclient.callback.ZkAsyncCallbacks.SetDataCallbackHandler();
+    zkClient.asyncSetData( "/tmp/async", normalZNRecord, -1, setDataCallbackHandler);
+    createCallback.waitForSuccess();
+    Assert.assertEquals(setDataCallbackHandler.getRc(), 0);
+
+    zkClient.asyncSetData( "/tmp/async", oversizeZNRecord, -1, setDataCallbackHandler);
+    createCallback.waitForSuccess();
+    Assert.assertEquals(setDataCallbackHandler.getRc(), KeeperException.Code.MarshallingError);
+  }
 }
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
index 0507c3f..3424f06 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
@@ -26,19 +26,20 @@ import javax.management.JMException;
 
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.helix.zookeeper.exception.ZkClientException;
+import org.apache.helix.zookeeper.zkclient.annotation.PreFetch;
+import org.apache.helix.zookeeper.zkclient.callback.ZkAsyncCallbacks;
 import org.apache.helix.zookeeper.zkclient.exception.ZkBadVersionException;
 import org.apache.helix.zookeeper.zkclient.exception.ZkException;
 import org.apache.helix.zookeeper.zkclient.exception.ZkInterruptedException;
+import org.apache.helix.zookeeper.zkclient.exception.ZkMarshallingError;
 import org.apache.helix.zookeeper.zkclient.exception.ZkNoNodeException;
 import org.apache.helix.zookeeper.zkclient.exception.ZkNodeExistsException;
+import org.apache.helix.zookeeper.zkclient.exception.ZkSessionMismatchedException;
 import org.apache.helix.zookeeper.zkclient.exception.ZkTimeoutException;
-import org.apache.helix.zookeeper.zkclient.serialize.ZkSerializer;
-import org.apache.helix.zookeeper.zkclient.annotation.PreFetch;
+import org.apache.helix.zookeeper.zkclient.metric.ZkClientMonitor;
 import org.apache.helix.zookeeper.zkclient.serialize.BasicZkSerializer;
 import org.apache.helix.zookeeper.zkclient.serialize.PathBasedZkSerializer;
-import org.apache.helix.zookeeper.zkclient.callback.ZkAsyncCallbacks;
-import org.apache.helix.zookeeper.zkclient.exception.ZkSessionMismatchedException;
-import org.apache.helix.zookeeper.zkclient.metric.ZkClientMonitor;
+import org.apache.helix.zookeeper.zkclient.serialize.ZkSerializer;
 import org.apache.helix.zookeeper.zkclient.util.ExponentialBackoffStrategy;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
@@ -1720,17 +1721,22 @@ public class ZkClient implements Watcher {
   public void asyncCreate(final String path, Object datat, final CreateMode mode,
       final ZkAsyncCallbacks.CreateCallbackHandler cb) {
     final long startT = System.currentTimeMillis();
-    final byte[] data = (datat == null ? null : serialize(datat, path));
-    retryUntilConnected(new Callable<Object>() {
-      @Override
-      public Object call() throws Exception {
-        ((ZkConnection) getConnection()).getZookeeper()
-            .create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE,
-                // Arrays.asList(DEFAULT_ACL),
-                mode, cb, new ZkAsyncCallbacks.ZkAsyncCallContext(_monitor, startT,
-                    data == null ? 0 : data.length, false));
-        return null;
-      }
+    byte[] data = null;
+    try {
+      data = (datat == null ? null : serialize(datat, path));
+    } catch (ZkMarshallingError e) {
+      cb.processResult(KeeperException.Code.MARSHALLINGERROR.intValue(), path,
+          new ZkAsyncCallbacks.ZkAsyncCallContext(_monitor, startT, 0, false), null);
+      return;
+    }
+    final byte[] finalData = data;
+    retryUntilConnected(() -> {
+      ((ZkConnection) getConnection()).getZookeeper()
+          .create(path, finalData, ZooDefs.Ids.OPEN_ACL_UNSAFE,
+              // Arrays.asList(DEFAULT_ACL),
+              mode, cb, new ZkAsyncCallbacks.ZkAsyncCallContext(_monitor, startT,
+                  finalData == null ? 0 : finalData.length, false));
+      return null;
     });
   }
 
@@ -1738,15 +1744,20 @@ public class ZkClient implements Watcher {
   public void asyncSetData(final String path, Object datat, final int version,
       final ZkAsyncCallbacks.SetDataCallbackHandler cb) {
     final long startT = System.currentTimeMillis();
-    final byte[] data = serialize(datat, path);
-    retryUntilConnected(new Callable<Object>() {
-      @Override
-      public Object call() throws Exception {
-        ((ZkConnection) getConnection()).getZookeeper().setData(path, data, version, cb,
-            new ZkAsyncCallbacks.ZkAsyncCallContext(_monitor, startT,
-                data == null ? 0 : data.length, false));
-        return null;
-      }
+    byte[] data = null;
+    try {
+      data = serialize(datat, path);
+    } catch (ZkMarshallingError e) {
+      cb.processResult(KeeperException.Code.MARSHALLINGERROR.intValue(), path,
+          new ZkAsyncCallbacks.ZkAsyncCallContext(_monitor, startT, 0, false), null);
+      return;
+    }
+    final byte[] finalData = data;
+    retryUntilConnected(() -> {
+      ((ZkConnection) getConnection()).getZookeeper().setData(path, finalData, version, cb,
+          new ZkAsyncCallbacks.ZkAsyncCallContext(_monitor, startT,
+              finalData == null ? 0 : finalData.length, false));
+      return null;
     });
   }