You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by xy...@apache.org on 2023/03/10 00:28:38 UTC
[helix] branch metaclient updated: Add support for state change in ZkMetaClient
This is an automated email from the ASF dual-hosted git repository.
xyuanlu pushed a commit to branch metaclient
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/metaclient by this push:
new 6ff608a9c Add support for state change in ZkMetaClient
6ff608a9c is described below
commit 6ff608a9c6ec7d72a3abe5bbadbf5ebb835662b5
Author: xyuanlu <xy...@gmail.com>
AuthorDate: Thu Mar 9 16:28:31 2023 -0800
Add support for state change in ZkMetaClient
Add support for state change in ZkMetaClient
---
.../helix/metaclient/api/MetaClientInterface.java | 15 +++--
.../helix/metaclient/impl/zk/ZkMetaClient.java | 6 +-
.../zk/adapter/StateChangeListenerAdapter.java | 76 ++++++++++++++++++++++
.../metaclient/impl/zk/util/ZkMetaClientUtil.java | 26 +++++++-
.../helix/metaclient/impl/zk/TestZkMetaClient.java | 51 ++++++++++-----
.../helix/zookeeper/zkclient/IZkStateListener.java | 4 ++
.../apache/helix/zookeeper/zkclient/ZkClient.java | 9 +--
7 files changed, 158 insertions(+), 29 deletions(-)
diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/api/MetaClientInterface.java b/meta-client/src/main/java/org/apache/helix/metaclient/api/MetaClientInterface.java
index 433184de6..7445a0d33 100644
--- a/meta-client/src/main/java/org/apache/helix/metaclient/api/MetaClientInterface.java
+++ b/meta-client/src/main/java/org/apache/helix/metaclient/api/MetaClientInterface.java
@@ -56,6 +56,9 @@ public interface MetaClientInterface<T> {
}
enum ConnectState {
+ // Client is not connected to server. Before initiating connection or after close.
+ NOT_CONNECTED,
+
// Client is connected to server
CONNECTED,
@@ -65,11 +68,15 @@ public interface MetaClientInterface<T> {
// Server has expired this connection.
EXPIRED,
- // When client failed to connect server.
- INIT_FAILED,
-
// When client explicitly call disconnect.
- CLOSED_BY_CLIENT
+ CLOSED_BY_CLIENT,
+
+ // Connection between client and server is lost.
+ DISCONNECTED,
+
+ // Client is authenticated. They can perform operation with authorized permissions.
+ // This state is not in use as of now.
+ AUTHENTICATED
}
/**
diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java
index 7dedd02d5..7934fee8a 100644
--- a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java
+++ b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java
@@ -36,6 +36,7 @@ import org.apache.helix.metaclient.api.OpResult;
import org.apache.helix.metaclient.exception.MetaClientException;
import org.apache.helix.metaclient.impl.zk.adapter.DataListenerAdapter;
import org.apache.helix.metaclient.impl.zk.adapter.DirectChildListenerAdapter;
+import org.apache.helix.metaclient.impl.zk.adapter.StateChangeListenerAdapter;
import org.apache.helix.metaclient.impl.zk.adapter.ZkMetaClientCreateCallbackHandler;
import org.apache.helix.metaclient.impl.zk.adapter.ZkMetaClientDeleteCallbackHandler;
import org.apache.helix.metaclient.impl.zk.adapter.ZkMetaClientExistCallbackHandler;
@@ -285,7 +286,8 @@ public class ZkMetaClient<T> implements MetaClientInterface<T>, AutoCloseable {
@Override
public boolean subscribeStateChanges(ConnectStateChangeListener listener) {
- return false;
+ _zkClient.subscribeStateChanges(new StateChangeListenerAdapter(listener));
+ return true;
}
@Override
@@ -310,7 +312,7 @@ public class ZkMetaClient<T> implements MetaClientInterface<T>, AutoCloseable {
@Override
public void unsubscribeConnectStateChanges(ConnectStateChangeListener listener) {
-
+ _zkClient.subscribeStateChanges(new StateChangeListenerAdapter(listener));
}
@Override
diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/adapter/StateChangeListenerAdapter.java b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/adapter/StateChangeListenerAdapter.java
new file mode 100644
index 000000000..8ad324bb2
--- /dev/null
+++ b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/adapter/StateChangeListenerAdapter.java
@@ -0,0 +1,76 @@
+package org.apache.helix.metaclient.impl.zk.adapter;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.metaclient.api.ConnectStateChangeListener;
+import org.apache.helix.metaclient.impl.zk.util.ZkMetaClientUtil;
+import org.apache.helix.zookeeper.zkclient.IZkStateListener;
+import org.apache.zookeeper.Watcher;
+
+
+public class StateChangeListenerAdapter implements IZkStateListener {
+ private final ConnectStateChangeListener _listener;
+
+ public StateChangeListenerAdapter(ConnectStateChangeListener listener) {
+ _listener = listener;
+ }
+
+ @Override
+ public void handleStateChanged(Watcher.Event.KeeperState state) throws Exception {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void handleNewSession(String sessionId) throws Exception {
+ // This function will be invoked when connection is established. It is a no-op for metaclient.
+ // MetaClient will expose this to user as 'handleStateChanged' already covers state change
+ // notification for new connection establishment.
+ }
+
+ @Override
+ public void handleSessionEstablishmentError(Throwable error) throws Exception {
+ _listener.handleConnectionEstablishmentError(error);
+ }
+
+ @Override
+ public void handleStateChanged(Watcher.Event.KeeperState prevState,
+ Watcher.Event.KeeperState curState) throws Exception {
+ _listener.handleConnectStateChanged(
+ ZkMetaClientUtil.translateKeeperStateToMetaClientConnectState(prevState),
+ ZkMetaClientUtil.translateKeeperStateToMetaClientConnectState(curState));
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ StateChangeListenerAdapter that = (StateChangeListenerAdapter) o;
+ return _listener.equals(that._listener);
+ }
+
+ @Override
+ public int hashCode() {
+ return _listener.hashCode();
+ }
+}
diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/util/ZkMetaClientUtil.java b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/util/ZkMetaClientUtil.java
index a733ba4fb..a9bee4cbb 100644
--- a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/util/ZkMetaClientUtil.java
+++ b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/util/ZkMetaClientUtil.java
@@ -42,6 +42,7 @@ import org.apache.helix.zookeeper.zkclient.exception.ZkTimeoutException;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Op;
+import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.server.EphemeralType;
@@ -52,7 +53,7 @@ public class ZkMetaClientUtil {
private static final List<ACL> DEFAULT_ACL =
Collections.unmodifiableList(ZooDefs.Ids.OPEN_ACL_UNSAFE);
- private ZkMetaClientUtil(){
+ private ZkMetaClientUtil() {
}
/**
@@ -239,6 +240,29 @@ public class ZkMetaClientUtil {
return new MetaClientException(e);
}
+ public static MetaClientInterface.ConnectState translateKeeperStateToMetaClientConnectState(
+ Watcher.Event.KeeperState keeperState) {
+ if (keeperState == null)
+ return MetaClientInterface.ConnectState.NOT_CONNECTED;
+ switch (keeperState) {
+ case AuthFailed:
+ return MetaClientInterface.ConnectState.AUTH_FAILED;
+ case Closed:
+ return MetaClientInterface.ConnectState.CLOSED_BY_CLIENT;
+ case Disconnected:
+ return MetaClientInterface.ConnectState.DISCONNECTED;
+ case Expired:
+ return MetaClientInterface.ConnectState.EXPIRED;
+ case SaslAuthenticated:
+ return MetaClientInterface.ConnectState.AUTHENTICATED;
+ case SyncConnected:
+ case ConnectedReadOnly:
+ return MetaClientInterface.ConnectState.CONNECTED;
+ default:
+ throw new IllegalArgumentException(keeperState + " is not a supported.");
+ }
+ }
+
/**
* This function translate and group Zk exception code to metaclient code.
* It currently includes all ZK code on 3.6.3.
diff --git a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java
index f23ed0f03..49cbae1f3 100644
--- a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java
+++ b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java
@@ -19,43 +19,29 @@ package org.apache.helix.metaclient.impl.zk;
* under the License.
*/
-import java.io.File;
-import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
-import java.util.Set;
-
-import org.apache.helix.metaclient.api.DataUpdater;
-import org.apache.helix.metaclient.api.MetaClientInterface;
-import org.apache.helix.metaclient.exception.MetaClientException;
-import java.util.HashMap;
import java.util.Map;
import java.util.Set;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.helix.metaclient.api.DataUpdater;
-import org.apache.helix.metaclient.api.DirectChildChangeListener;
-import org.apache.helix.metaclient.api.MetaClientInterface;
-import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang3.NotImplementedException;
+import org.apache.helix.metaclient.api.ConnectStateChangeListener;
import org.apache.helix.metaclient.api.DataChangeListener;
+import org.apache.helix.metaclient.api.DataUpdater;
+import org.apache.helix.metaclient.api.DirectChildChangeListener;
+import org.apache.helix.metaclient.api.MetaClientInterface;
import org.apache.helix.metaclient.api.Op;
import org.apache.helix.metaclient.api.OpResult;
import org.apache.helix.metaclient.exception.MetaClientException;
import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig;
-import org.apache.helix.zookeeper.zkclient.IDefaultNameSpace;
-import org.apache.helix.zookeeper.zkclient.ZkServer;
import org.apache.zookeeper.KeeperException;
import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import static org.apache.helix.metaclient.api.MetaClientInterface.EntryMode.CONTAINER;
@@ -334,6 +320,35 @@ public class TestZkMetaClient extends ZkMetaClientTestBase{
}
}
+ @Test
+ public void testConnectStateChangeListener() throws Exception {
+ final String basePath = "/TestZkMetaClient_testConnectionStateChangeListener";
+ try (ZkMetaClient<String> zkMetaClient = createZkMetaClient()) {
+ CountDownLatch countDownLatch = new CountDownLatch(1);
+ final MetaClientInterface.ConnectState[] connectState =
+ new MetaClientInterface.ConnectState[2];
+ ConnectStateChangeListener listener = new ConnectStateChangeListener() {
+ @Override
+ public void handleConnectStateChanged(MetaClientInterface.ConnectState prevState,
+ MetaClientInterface.ConnectState currentState) throws Exception {
+ connectState[0] = prevState;
+ connectState[1] = currentState;
+ countDownLatch.countDown();
+ }
+
+ @Override
+ public void handleConnectionEstablishmentError(Throwable error) throws Exception {
+
+ }
+ };
+ Assert.assertTrue(zkMetaClient.subscribeStateChanges(listener));
+ zkMetaClient.connect();
+ countDownLatch.await(5000, TimeUnit.SECONDS);
+ Assert.assertEquals(connectState[0], MetaClientInterface.ConnectState.NOT_CONNECTED);
+ Assert.assertEquals(connectState[1], MetaClientInterface.ConnectState.CONNECTED);
+ }
+ }
+
/**
* Transactional op calls zk.multi() with a set of ops (operations)
* and the return values are converted into metaclient opResults.
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/IZkStateListener.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/IZkStateListener.java
index 5970e623d..f2c4190de 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/IZkStateListener.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/IZkStateListener.java
@@ -58,4 +58,8 @@ public interface IZkStateListener {
* On any error.
*/
void handleSessionEstablishmentError(final Throwable error) throws Exception;
+
+ default void handleStateChanged(KeeperState prevState, KeeperState curState) throws Exception {
+ handleStateChanged(curState);
+ }
}
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
index 31fe4e97a..51904ede6 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
@@ -1486,12 +1486,13 @@ public class ZkClient implements Watcher {
protected void processStateChanged(WatchedEvent event) {
LOG.info("zkclient {}, zookeeper state changed ( {} )", _uid, event.getState());
+ KeeperState prevState = _currentState;
setCurrentState(event.getState());
if (getShutdownTrigger()) {
return;
}
- fireStateChangedEvent(event.getState());
+ fireStateChangedEvent(prevState, event.getState());
/*
* Note, the intention is that only the ZkClient managing the session would do auto reconnect
@@ -1660,15 +1661,15 @@ public class ZkClient implements Watcher {
}
}
- protected void fireStateChangedEvent(final KeeperState state) {
+ protected void fireStateChangedEvent(final KeeperState prevState, final KeeperState curState) {
final String sessionId = getHexSessionId();
for (final IZkStateListener stateListener : _stateListener) {
- final String description = "State changed to " + state + " sent to " + stateListener;
+ final String description = "State changed to " + curState + " sent to " + stateListener;
_eventThread.send(new ZkEventThread.ZkEvent(description, sessionId) {
@Override
public void run() throws Exception {
- stateListener.handleStateChanged(state);
+ stateListener.handleStateChanged(prevState, curState);
}
});
}