You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ji...@apache.org on 2020/07/24 02:22:15 UTC

[helix] branch master updated: Add sync() call first for new session handling (#1119)

This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new 7aeae0d  Add sync() call first for new session handling (#1119)
7aeae0d is described below

commit 7aeae0d36cd05886b34fb8583fd95cb831c48b72
Author: kaisun2000 <52...@users.noreply.github.com>
AuthorDate: Thu Jul 23 19:22:04 2020 -0700

    Add sync() call first for new session handling (#1119)
    
    Helix may see stale data when session expires and get reconnected to
    a slower ZKServer. This would cause various correctness problem.
    We would call sync() to ZKserver first. This ensures Helix would
    not see data that saw before.
    
    Co-authored-by: Kai Sun <ks...@ksun-mn1.linkedin.biz>
---
 .../zookeeper/constant/ZkSystemPropertyKeys.java   | 10 +++
 .../apache/helix/zookeeper/zkclient/ZkClient.java  | 82 +++++++++++++++++++++-
 .../zkclient/callback/ZkAsyncCallbacks.java        | 39 +++++++++-
 .../zookeeper/impl/client/TestRawZkClient.java     | 35 +++++++++
 4 files changed, 162 insertions(+), 4 deletions(-)

diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/constant/ZkSystemPropertyKeys.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/constant/ZkSystemPropertyKeys.java
index bd88c61..0153e59 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/constant/ZkSystemPropertyKeys.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/constant/ZkSystemPropertyKeys.java
@@ -50,4 +50,14 @@ public class ZkSystemPropertyKeys {
    */
   public static final String ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES =
       "zk.serializer.znrecord.write.size.limit.bytes";
+
+  /**
+   * This property determines the behavior of ZkClient issuing an sync() to server upon new session
+   * established.
+   *
+   * <p>
+   *   The default value is "true" (issuing sync)
+   */
+  public static final String ZK_AUTOSYNC_ENABLED =
+      "zk.zkclient.autosync.enabled";
 }
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
index 7a4829d..f1ddecb 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
@@ -21,11 +21,15 @@ import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import javax.management.JMException;
 
 import org.apache.helix.zookeeper.api.client.ChildrenSubscribeResult;
 import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
+import org.apache.helix.zookeeper.constant.ZkSystemPropertyKeys;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.helix.zookeeper.exception.ZkClientException;
 import org.apache.helix.zookeeper.zkclient.annotation.PreFetch;
@@ -46,6 +50,7 @@ import org.apache.helix.zookeeper.zkclient.serialize.BasicZkSerializer;
 import org.apache.helix.zookeeper.zkclient.serialize.PathBasedZkSerializer;
 import org.apache.helix.zookeeper.zkclient.serialize.ZkSerializer;
 import org.apache.helix.zookeeper.zkclient.util.ExponentialBackoffStrategy;
+import org.apache.zookeeper.AsyncCallback;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.ConnectionLossException;
@@ -81,6 +86,9 @@ public class ZkClient implements Watcher {
   // TODO: remove it once we have a better way to exit retry for this case
   private static final int NUM_CHILDREN_LIMIT;
 
+
+  private static final String ZK_AUTOSYNC_ENABLED_DEFAULT = "true";
+
   private final IZkConnection _connection;
   private final long _operationRetryTimeoutInMillis;
   private final Map<String, Set<IZkChildListener>> _childListener = new ConcurrentHashMap<>();
@@ -118,6 +126,12 @@ public class ZkClient implements Watcher {
     NUM_CHILDREN_LIMIT = 100 * 1000;
   }
 
+  private static final boolean SYNC_ON_SESSION = Boolean.parseBoolean(
+      System.getProperty(ZkSystemPropertyKeys.ZK_AUTOSYNC_ENABLED, ZK_AUTOSYNC_ENABLED_DEFAULT));
+  ;
+
+  private static final String SYNC_PATH = "/";
+
   private class IZkDataListenerEntry {
     final IZkDataListener _dataListener;
     final boolean _prefetchData;
@@ -200,6 +214,7 @@ public class ZkClient implements Watcher {
     if (zkConnection == null) {
       throw new NullPointerException("Zookeeper connection is null!");
     }
+
     _connection = zkConnection;
     _pathBasedZkSerializer = zkSerializer;
     _operationRetryTimeoutInMillis = operationRetryTimeout;
@@ -1256,21 +1271,82 @@ public class ZkClient implements Watcher {
     }
   }
 
+
+
+  private void doAsyncSync(final ZooKeeper zk, final String path, final long startT,
+      final ZkAsyncCallbacks.SyncCallbackHandler cb) {
+    zk.sync(path, cb,
+        new ZkAsyncRetryCallContext(_asyncCallRetryThread, cb, _monitor, startT, 0, true) {
+          @Override
+          protected void doRetry() throws Exception {
+            doAsyncSync(zk, path, System.currentTimeMillis(), cb);
+          }
+        });
+  }
+
+  /*
+   *  Note, issueSync takes a ZooKeeper (client) object and pass it to doAsyncSync().
+   *  The reason we do this is that we want to ensure each new session event is preceded with exactly
+   *  one sync() to server. The sync() is to make sure the server would not see stale data.
+   *
+   *  ZooKeeper client object has an invariant of each object has one session. With this invariant
+   *  we can achieve each one sync() to server upon new session establishment. The reasoning is:
+   *  issueSync() is called when fireNewSessionEvents() which in under eventLock of ZkClient. Thus
+   *  we are guaranteed the ZooKeeper object passed in would have the new incoming sessionId. If by
+   *  the time sync() is invoked, the session expires. The sync() would fail with a stale session.
+   *  This is exactly what we want. The newer session would ensure another fireNewSessionEvents.
+   */
+  private boolean issueSync(ZooKeeper zk) {
+    String sessionId = Long.toHexString(zk.getSessionId());
+    ZkAsyncCallbacks.SyncCallbackHandler callbackHandler =
+        new ZkAsyncCallbacks.SyncCallbackHandler(sessionId);
+
+    final long startT = System.currentTimeMillis();
+    doAsyncSync(zk, SYNC_PATH, startT, callbackHandler);
+
+    callbackHandler.waitForSuccess();
+
+    KeeperException.Code code = KeeperException.Code.get(callbackHandler.getRc());
+    if (code == KeeperException.Code.OK) {
+      LOG.info("sycnOnNewSession with sessionID {} async return code: {} and proceeds", sessionId,
+          code);
+      return true;
+    }
+
+    // Not retryable error, including session expiration; return false.
+    return false;
+  }
+
   private void fireNewSessionEvents() {
     // only managing zkclient fire handleNewSession event
     if (!isManagingZkConnection()) {
       return;
     }
     final String sessionId = getHexSessionId();
-    for (final IZkStateListener stateListener : _stateListener) {
-      _eventThread.send(new ZkEventThread.ZkEvent("New session event sent to " + stateListener, sessionId) {
 
+    if (SYNC_ON_SESSION) {
+      final ZooKeeper zk = ((ZkConnection) getConnection()).getZookeeper();
+      _eventThread.send(new ZkEventThread.ZkEvent("Sync call before new session event of session " + sessionId,
+          sessionId) {
         @Override
         public void run() throws Exception {
-          stateListener.handleNewSession(sessionId);
+          if (issueSync(zk) == false) {
+            LOG.warn("\"Failed to call sync() on new session {}", sessionId);
+          }
         }
       });
     }
+
+    for (final IZkStateListener stateListener : _stateListener) {
+      _eventThread
+          .send(new ZkEventThread.ZkEvent("New session event sent to " + stateListener, sessionId) {
+
+            @Override
+            public void run() throws Exception {
+              stateListener.handleNewSession(sessionId);
+            }
+          });
+    }
   }
 
   protected void fireStateChangedEvent(final KeeperState state) {
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/ZkAsyncCallbacks.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/ZkAsyncCallbacks.java
index 70dbab4..228bcae 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/ZkAsyncCallbacks.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/ZkAsyncCallbacks.java
@@ -22,10 +22,12 @@ package org.apache.helix.zookeeper.zkclient.callback;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.helix.zookeeper.zkclient.metric.ZkClientMonitor;
+import org.apache.zookeeper.AsyncCallback;
 import org.apache.zookeeper.AsyncCallback.DataCallback;
 import org.apache.zookeeper.AsyncCallback.StatCallback;
 import org.apache.zookeeper.AsyncCallback.StringCallback;
 import org.apache.zookeeper.AsyncCallback.VoidCallback;
+import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.Code;
 import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
@@ -120,6 +122,41 @@ public class ZkAsyncCallbacks {
     }
   }
 
+  public static class SyncCallbackHandler extends DefaultCallback implements AsyncCallback.VoidCallback {
+    private String _sessionId;
+
+    public SyncCallbackHandler(String sessionId) {
+      _sessionId = sessionId;
+    }
+
+    @Override
+    public void processResult(int rc, String path, Object ctx) {
+      LOG.debug("sync() call with sessionID {} async return code: {}", _sessionId, rc);
+      callback(rc, path, ctx);
+    }
+
+    @Override
+    public void handle() {
+      // Make compiler happy, not used.
+    }
+
+    @Override
+    protected boolean needRetry(int rc) {
+      try {
+        switch (KeeperException.Code.get(rc)) {
+          /** Connection to the server has been lost */
+          case CONNECTIONLOSS:
+            return true;
+          default:
+            return false;
+        }
+      } catch (ClassCastException | NullPointerException ex) {
+        LOG.error("Session {} failed to handle unknown return code {}. Skip retrying. ex {}",
+            _sessionId, rc, ex);
+        return false;
+      }
+    }
+  }
   /**
    * Default callback for zookeeper async api.
    */
@@ -215,7 +252,7 @@ public class ZkAsyncCallbacks {
      * @param rc the return code
      * @return true if the error is transient and the operation may succeed when being retried.
      */
-    private boolean needRetry(int rc) {
+    protected boolean needRetry(int rc) {
       try {
         switch (Code.get(rc)) {
         /** Connection to the server has been lost */
diff --git a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestRawZkClient.java b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestRawZkClient.java
index 4669e86..84b2d7f 100644
--- a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestRawZkClient.java
+++ b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestRawZkClient.java
@@ -559,6 +559,41 @@ public class TestRawZkClient extends ZkTestBase {
   }
 
   /*
+   * This test validates that when ZK_AUTOSYNC_ENABLED_DEFAULT is enabled, sync() would be issued
+   * before handleNewSession. ZKclient would not see stale data.
+   */
+  @Test
+  public void testAutoSyncWithNewSessionEstablishment() throws Exception {
+    final String path = "/" + TestHelper.getTestMethodName();
+    final String data = "Hello Helix 2";
+
+    // Wait until the ZkClient has got a new session.
+    Assert.assertTrue(_zkClient.waitUntilConnected(1, TimeUnit.SECONDS));
+
+    final long originalSessionId = _zkClient.getSessionId();
+
+    try {
+      // Create node.
+      _zkClient.create(path, data, CreateMode.PERSISTENT);
+    } catch (Exception ex) {
+      Assert.fail("Failed to create ephemeral node.", ex);
+    }
+
+    // Expire the original session.
+    ZkTestHelper.expireSession(_zkClient);
+
+    // Verify the node is created and its data is correct.
+    Stat stat = new Stat();
+    String nodeData = null;
+    try {
+       nodeData = _zkClient.readData(path, stat, true);
+    } catch (ZkException e) {
+      Assert.fail("fail to read data");
+    }
+    Assert.assertEquals(nodeData, data, "Data is not correct.");
+  }
+
+  /*
    * This test checks that ephemeral creation fails because the expected zk session does not match
    * the actual zk session.
    * How this test does is: