You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ji...@apache.org on 2020/07/24 02:22:15 UTC
[helix] branch master updated: Add sync() call first for new
session handling (#1119)
This is an automated email from the ASF dual-hosted git repository.
jiajunwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new 7aeae0d Add sync() call first for new session handling (#1119)
7aeae0d is described below
commit 7aeae0d36cd05886b34fb8583fd95cb831c48b72
Author: kaisun2000 <52...@users.noreply.github.com>
AuthorDate: Thu Jul 23 19:22:04 2020 -0700
Add sync() call first for new session handling (#1119)
Helix may see stale data when session expires and get reconnected to
a slower ZKServer. This would cause various correctness problem.
We would call sync() to ZKserver first. This ensures Helix would
not see data that saw before.
Co-authored-by: Kai Sun <ks...@ksun-mn1.linkedin.biz>
---
.../zookeeper/constant/ZkSystemPropertyKeys.java | 10 +++
.../apache/helix/zookeeper/zkclient/ZkClient.java | 82 +++++++++++++++++++++-
.../zkclient/callback/ZkAsyncCallbacks.java | 39 +++++++++-
.../zookeeper/impl/client/TestRawZkClient.java | 35 +++++++++
4 files changed, 162 insertions(+), 4 deletions(-)
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/constant/ZkSystemPropertyKeys.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/constant/ZkSystemPropertyKeys.java
index bd88c61..0153e59 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/constant/ZkSystemPropertyKeys.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/constant/ZkSystemPropertyKeys.java
@@ -50,4 +50,14 @@ public class ZkSystemPropertyKeys {
*/
public static final String ZK_SERIALIZER_ZNRECORD_WRITE_SIZE_LIMIT_BYTES =
"zk.serializer.znrecord.write.size.limit.bytes";
+
+ /**
+ * This property determines the behavior of ZkClient issuing an sync() to server upon new session
+ * established.
+ *
+ * <p>
+ * The default value is "true" (issuing sync)
+ */
+ public static final String ZK_AUTOSYNC_ENABLED =
+ "zk.zkclient.autosync.enabled";
}
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
index 7a4829d..f1ddecb 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
@@ -21,11 +21,15 @@ import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import javax.management.JMException;
import org.apache.helix.zookeeper.api.client.ChildrenSubscribeResult;
import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
+import org.apache.helix.zookeeper.constant.ZkSystemPropertyKeys;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.zookeeper.exception.ZkClientException;
import org.apache.helix.zookeeper.zkclient.annotation.PreFetch;
@@ -46,6 +50,7 @@ import org.apache.helix.zookeeper.zkclient.serialize.BasicZkSerializer;
import org.apache.helix.zookeeper.zkclient.serialize.PathBasedZkSerializer;
import org.apache.helix.zookeeper.zkclient.serialize.ZkSerializer;
import org.apache.helix.zookeeper.zkclient.util.ExponentialBackoffStrategy;
+import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.ConnectionLossException;
@@ -81,6 +86,9 @@ public class ZkClient implements Watcher {
// TODO: remove it once we have a better way to exit retry for this case
private static final int NUM_CHILDREN_LIMIT;
+
+ private static final String ZK_AUTOSYNC_ENABLED_DEFAULT = "true";
+
private final IZkConnection _connection;
private final long _operationRetryTimeoutInMillis;
private final Map<String, Set<IZkChildListener>> _childListener = new ConcurrentHashMap<>();
@@ -118,6 +126,12 @@ public class ZkClient implements Watcher {
NUM_CHILDREN_LIMIT = 100 * 1000;
}
+ private static final boolean SYNC_ON_SESSION = Boolean.parseBoolean(
+ System.getProperty(ZkSystemPropertyKeys.ZK_AUTOSYNC_ENABLED, ZK_AUTOSYNC_ENABLED_DEFAULT));
+ ;
+
+ private static final String SYNC_PATH = "/";
+
private class IZkDataListenerEntry {
final IZkDataListener _dataListener;
final boolean _prefetchData;
@@ -200,6 +214,7 @@ public class ZkClient implements Watcher {
if (zkConnection == null) {
throw new NullPointerException("Zookeeper connection is null!");
}
+
_connection = zkConnection;
_pathBasedZkSerializer = zkSerializer;
_operationRetryTimeoutInMillis = operationRetryTimeout;
@@ -1256,21 +1271,82 @@ public class ZkClient implements Watcher {
}
}
+
+
+ private void doAsyncSync(final ZooKeeper zk, final String path, final long startT,
+ final ZkAsyncCallbacks.SyncCallbackHandler cb) {
+ zk.sync(path, cb,
+ new ZkAsyncRetryCallContext(_asyncCallRetryThread, cb, _monitor, startT, 0, true) {
+ @Override
+ protected void doRetry() throws Exception {
+ doAsyncSync(zk, path, System.currentTimeMillis(), cb);
+ }
+ });
+ }
+
+ /*
+ * Note, issueSync takes a ZooKeeper (client) object and pass it to doAsyncSync().
+ * The reason we do this is that we want to ensure each new session event is preceded with exactly
+ * one sync() to server. The sync() is to make sure the server would not see stale data.
+ *
+ * ZooKeeper client object has an invariant of each object has one session. With this invariant
+ * we can achieve each one sync() to server upon new session establishment. The reasoning is:
+ * issueSync() is called when fireNewSessionEvents() which in under eventLock of ZkClient. Thus
+ * we are guaranteed the ZooKeeper object passed in would have the new incoming sessionId. If by
+ * the time sync() is invoked, the session expires. The sync() would fail with a stale session.
+ * This is exactly what we want. The newer session would ensure another fireNewSessionEvents.
+ */
+ private boolean issueSync(ZooKeeper zk) {
+ String sessionId = Long.toHexString(zk.getSessionId());
+ ZkAsyncCallbacks.SyncCallbackHandler callbackHandler =
+ new ZkAsyncCallbacks.SyncCallbackHandler(sessionId);
+
+ final long startT = System.currentTimeMillis();
+ doAsyncSync(zk, SYNC_PATH, startT, callbackHandler);
+
+ callbackHandler.waitForSuccess();
+
+ KeeperException.Code code = KeeperException.Code.get(callbackHandler.getRc());
+ if (code == KeeperException.Code.OK) {
+ LOG.info("sycnOnNewSession with sessionID {} async return code: {} and proceeds", sessionId,
+ code);
+ return true;
+ }
+
+ // Not retryable error, including session expiration; return false.
+ return false;
+ }
+
private void fireNewSessionEvents() {
// only managing zkclient fire handleNewSession event
if (!isManagingZkConnection()) {
return;
}
final String sessionId = getHexSessionId();
- for (final IZkStateListener stateListener : _stateListener) {
- _eventThread.send(new ZkEventThread.ZkEvent("New session event sent to " + stateListener, sessionId) {
+ if (SYNC_ON_SESSION) {
+ final ZooKeeper zk = ((ZkConnection) getConnection()).getZookeeper();
+ _eventThread.send(new ZkEventThread.ZkEvent("Sync call before new session event of session " + sessionId,
+ sessionId) {
@Override
public void run() throws Exception {
- stateListener.handleNewSession(sessionId);
+ if (issueSync(zk) == false) {
+ LOG.warn("\"Failed to call sync() on new session {}", sessionId);
+ }
}
});
}
+
+ for (final IZkStateListener stateListener : _stateListener) {
+ _eventThread
+ .send(new ZkEventThread.ZkEvent("New session event sent to " + stateListener, sessionId) {
+
+ @Override
+ public void run() throws Exception {
+ stateListener.handleNewSession(sessionId);
+ }
+ });
+ }
}
protected void fireStateChangedEvent(final KeeperState state) {
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/ZkAsyncCallbacks.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/ZkAsyncCallbacks.java
index 70dbab4..228bcae 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/ZkAsyncCallbacks.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/ZkAsyncCallbacks.java
@@ -22,10 +22,12 @@ package org.apache.helix.zookeeper.zkclient.callback;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.helix.zookeeper.zkclient.metric.ZkClientMonitor;
+import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.AsyncCallback.DataCallback;
import org.apache.zookeeper.AsyncCallback.StatCallback;
import org.apache.zookeeper.AsyncCallback.StringCallback;
import org.apache.zookeeper.AsyncCallback.VoidCallback;
+import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
@@ -120,6 +122,41 @@ public class ZkAsyncCallbacks {
}
}
+ public static class SyncCallbackHandler extends DefaultCallback implements AsyncCallback.VoidCallback {
+ private String _sessionId;
+
+ public SyncCallbackHandler(String sessionId) {
+ _sessionId = sessionId;
+ }
+
+ @Override
+ public void processResult(int rc, String path, Object ctx) {
+ LOG.debug("sync() call with sessionID {} async return code: {}", _sessionId, rc);
+ callback(rc, path, ctx);
+ }
+
+ @Override
+ public void handle() {
+ // Make compiler happy, not used.
+ }
+
+ @Override
+ protected boolean needRetry(int rc) {
+ try {
+ switch (KeeperException.Code.get(rc)) {
+ /** Connection to the server has been lost */
+ case CONNECTIONLOSS:
+ return true;
+ default:
+ return false;
+ }
+ } catch (ClassCastException | NullPointerException ex) {
+ LOG.error("Session {} failed to handle unknown return code {}. Skip retrying. ex {}",
+ _sessionId, rc, ex);
+ return false;
+ }
+ }
+ }
/**
* Default callback for zookeeper async api.
*/
@@ -215,7 +252,7 @@ public class ZkAsyncCallbacks {
* @param rc the return code
* @return true if the error is transient and the operation may succeed when being retried.
*/
- private boolean needRetry(int rc) {
+ protected boolean needRetry(int rc) {
try {
switch (Code.get(rc)) {
/** Connection to the server has been lost */
diff --git a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestRawZkClient.java b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestRawZkClient.java
index 4669e86..84b2d7f 100644
--- a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestRawZkClient.java
+++ b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestRawZkClient.java
@@ -559,6 +559,41 @@ public class TestRawZkClient extends ZkTestBase {
}
/*
+ * This test validates that when ZK_AUTOSYNC_ENABLED_DEFAULT is enabled, sync() would be issued
+ * before handleNewSession. ZKclient would not see stale data.
+ */
+ @Test
+ public void testAutoSyncWithNewSessionEstablishment() throws Exception {
+ final String path = "/" + TestHelper.getTestMethodName();
+ final String data = "Hello Helix 2";
+
+ // Wait until the ZkClient has got a new session.
+ Assert.assertTrue(_zkClient.waitUntilConnected(1, TimeUnit.SECONDS));
+
+ final long originalSessionId = _zkClient.getSessionId();
+
+ try {
+ // Create node.
+ _zkClient.create(path, data, CreateMode.PERSISTENT);
+ } catch (Exception ex) {
+ Assert.fail("Failed to create ephemeral node.", ex);
+ }
+
+ // Expire the original session.
+ ZkTestHelper.expireSession(_zkClient);
+
+ // Verify the node is created and its data is correct.
+ Stat stat = new Stat();
+ String nodeData = null;
+ try {
+ nodeData = _zkClient.readData(path, stat, true);
+ } catch (ZkException e) {
+ Assert.fail("fail to read data");
+ }
+ Assert.assertEquals(nodeData, data, "Data is not correct.");
+ }
+
+ /*
* This test checks that ephemeral creation fails because the expected zk session does not match
* the actual zk session.
* How this test does is: