You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@helix.apache.org by GitBox <gi...@apache.org> on 2020/07/16 18:05:40 UTC

[GitHub] [helix] dasahcc commented on a change in pull request #1119: Add sync() call first for new session handling

dasahcc commented on a change in pull request #1119:
URL: https://github.com/apache/helix/pull/1119#discussion_r455967004



##########
File path: zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/ZkAsyncCallbacks.java
##########
@@ -120,6 +122,41 @@ public void handle() {
     }
   }
 
+  public static class SyncCallbackHandler extends DefaultCallback implements AsyncCallback.VoidCallback {
+    private String _sessionId;
+
+    public SyncCallbackHandler(String sessionId) {
+      _sessionId = sessionId;
+    }
+
+    @Override
+    public void processResult(int rc, String path, Object ctx) {
+      LOG.info("sycnOnNewSession with sessionID {} async return code: {}", _sessionId, rc);
+      callback(rc, path, ctx);
+    }
+
+    @Override
+    public void handle() {
+      // Make compiler happy, not used.
+    }
+
+    @Override
+    protected boolean needRetry(int rc) {
+      try {
+        switch (KeeperException.Code.get(rc)) {
+          /** Connection to the server has been lost */
+          case CONNECTIONLOSS:
+            return true;
+          default:
+            return false;

Review comment:
       Will there be potential sync on different code? If not, "if" statement will be much simpler than this.

##########
File path: zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
##########
@@ -1221,21 +1238,85 @@ private void reconnect() {
     }
   }
 
+
+
+  private void doAsyncSync(final ZooKeeper zk, final String path, final long startT,
+      final ZkAsyncCallbacks.SyncCallbackHandler cb) {
+    zk.sync(path, cb,
+        new ZkAsyncRetryCallContext(_asyncCallRetryThread, cb, _monitor, startT, 0, true) {
+          @Override
+          protected void doRetry() throws Exception {
+            doAsyncSync(zk, path, System.currentTimeMillis(), cb);
+          }
+        });
+  }
+
+  /*
+   *  Note, retrySync takes a ZooKeeper (client) object and pass it to doAsyncSync().
+   *  The reason we do this is that we want to ensure each new session event is preceded with exactly
+   *  one sync() to server. The sync() is to make sure the server would not see stale data.
+   *
+   *  ZooKeeper client object has an invariant of each object has one session. With this invariant
+   *  we can achieve each one sync() to server upon new session establishment. The reasoning is:
+   *  retrySync() is called when fireNewSessionEvents() which in under eventLock of ZkClient. Thus
+   *  we are guaranteed the ZooKeeper object passed in would have the new incoming sessionId. If by
+   *  the time sync() is invoked, the session expires. The sync() would fail with a stale session.
+   *  This is exactly what we want. The newer session would ensure another fireNewSessionEvents.
+   */
+  private boolean retrySync(String sessionId, ZooKeeper zk) throws ZkInterruptedException {
+    ZkAsyncCallbacks.SyncCallbackHandler callbackHandler =
+        new ZkAsyncCallbacks.SyncCallbackHandler(sessionId);
+
+    final long startT = System.currentTimeMillis();
+    doAsyncSync(zk, _syncPath, startT, callbackHandler);
+
+    callbackHandler.waitForSuccess();
+
+    KeeperException.Code code = KeeperException.Code.get(callbackHandler.getRc());
+    if (code == OK) {
+      LOG.info("sycnOnNewSession with sessionID {} async return code: {} and proceeds", sessionId,
+          code);
+      return true;
+    }
+
+    // Not retryable error, including session expiration; Log the error and return
+    LOG.warn(
+        "sycnOnNewSession with sessionID {} async return code: {} and not retryable, stop calling handleNewSession",
+        sessionId, code);
+    return false;
+  }
+
   private void fireNewSessionEvents() {
     // only managing zkclient fire handleNewSession event
     if (!isManagingZkConnection()) {
       return;
     }
     final String sessionId = getHexSessionId();
-    for (final IZkStateListener stateListener : _stateListener) {
-      _eventThread.send(new ZkEventThread.ZkEvent("New session event sent to " + stateListener, sessionId) {
 
+    if (_syncOnNewSession) {
+      final ZkConnection zkConnection = (ZkConnection) getConnection();

Review comment:
       The only usage for zkConnection is to get Zookeeper. Why not combine these two in oneline:
   final ZooKeeper zk = (ZkConnection) getConnection().getZookeeper();
   

##########
File path: zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
##########
@@ -1221,21 +1238,85 @@ private void reconnect() {
     }
   }
 
+
+
+  private void doAsyncSync(final ZooKeeper zk, final String path, final long startT,
+      final ZkAsyncCallbacks.SyncCallbackHandler cb) {
+    zk.sync(path, cb,
+        new ZkAsyncRetryCallContext(_asyncCallRetryThread, cb, _monitor, startT, 0, true) {
+          @Override
+          protected void doRetry() throws Exception {
+            doAsyncSync(zk, path, System.currentTimeMillis(), cb);
+          }
+        });
+  }
+
+  /*
+   *  Note, retrySync takes a ZooKeeper (client) object and pass it to doAsyncSync().
+   *  The reason we do this is that we want to ensure each new session event is preceded with exactly
+   *  one sync() to server. The sync() is to make sure the server would not see stale data.
+   *
+   *  ZooKeeper client object has an invariant of each object has one session. With this invariant
+   *  we can achieve each one sync() to server upon new session establishment. The reasoning is:
+   *  retrySync() is called when fireNewSessionEvents() which in under eventLock of ZkClient. Thus
+   *  we are guaranteed the ZooKeeper object passed in would have the new incoming sessionId. If by
+   *  the time sync() is invoked, the session expires. The sync() would fail with a stale session.
+   *  This is exactly what we want. The newer session would ensure another fireNewSessionEvents.
+   */
+  private boolean retrySync(String sessionId, ZooKeeper zk) throws ZkInterruptedException {

Review comment:
       Let rename it. We dont have retry right now. It could cause reader confusing.

##########
File path: zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
##########
@@ -1221,21 +1238,85 @@ private void reconnect() {
     }
   }
 
+
+
+  private void doAsyncSync(final ZooKeeper zk, final String path, final long startT,
+      final ZkAsyncCallbacks.SyncCallbackHandler cb) {
+    zk.sync(path, cb,
+        new ZkAsyncRetryCallContext(_asyncCallRetryThread, cb, _monitor, startT, 0, true) {
+          @Override
+          protected void doRetry() throws Exception {
+            doAsyncSync(zk, path, System.currentTimeMillis(), cb);
+          }
+        });
+  }
+
+  /*
+   *  Note, retrySync takes a ZooKeeper (client) object and pass it to doAsyncSync().
+   *  The reason we do this is that we want to ensure each new session event is preceded with exactly

Review comment:
       If there is a lot of connect/disconnect, could this zk.sync be called multiple times at same time? Would that cause potential problems?
   
   Since Zookeeper event thread is not dedup thread, are we just assuming the sync call could be short?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org