You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by xy...@apache.org on 2023/03/10 00:28:38 UTC

[helix] branch metaclient updated: Add support for state change in ZkMetaClient

This is an automated email from the ASF dual-hosted git repository.

xyuanlu pushed a commit to branch metaclient
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/metaclient by this push:
     new 6ff608a9c Add support for state change in ZkMetaClient
6ff608a9c is described below

commit 6ff608a9c6ec7d72a3abe5bbadbf5ebb835662b5
Author: xyuanlu <xy...@gmail.com>
AuthorDate: Thu Mar 9 16:28:31 2023 -0800

    Add support for state change in ZkMetaClient
    
    Add support for state change in ZkMetaClient
---
 .../helix/metaclient/api/MetaClientInterface.java  | 15 +++--
 .../helix/metaclient/impl/zk/ZkMetaClient.java     |  6 +-
 .../zk/adapter/StateChangeListenerAdapter.java     | 76 ++++++++++++++++++++++
 .../metaclient/impl/zk/util/ZkMetaClientUtil.java  | 26 +++++++-
 .../helix/metaclient/impl/zk/TestZkMetaClient.java | 51 ++++++++++-----
 .../helix/zookeeper/zkclient/IZkStateListener.java |  4 ++
 .../apache/helix/zookeeper/zkclient/ZkClient.java  |  9 +--
 7 files changed, 158 insertions(+), 29 deletions(-)

diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/api/MetaClientInterface.java b/meta-client/src/main/java/org/apache/helix/metaclient/api/MetaClientInterface.java
index 433184de6..7445a0d33 100644
--- a/meta-client/src/main/java/org/apache/helix/metaclient/api/MetaClientInterface.java
+++ b/meta-client/src/main/java/org/apache/helix/metaclient/api/MetaClientInterface.java
@@ -56,6 +56,9 @@ public interface MetaClientInterface<T> {
   }
 
   enum ConnectState {
+    // Client is not connected to server. Before initiating connection or after close.
+    NOT_CONNECTED,
+
     // Client is connected to server
     CONNECTED,
 
@@ -65,11 +68,15 @@ public interface MetaClientInterface<T> {
     // Server has expired this connection.
     EXPIRED,
 
-    // When client failed to connect server.
-    INIT_FAILED,
-
     // When client explicitly call disconnect.
-    CLOSED_BY_CLIENT
+    CLOSED_BY_CLIENT,
+
+    // Connection between client and server is lost.
+    DISCONNECTED,
+
+    // Client is authenticated. They can perform operation with authorized permissions.
+    // This state is not in use as of now.
+    AUTHENTICATED
   }
 
   /**
diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java
index 7dedd02d5..7934fee8a 100644
--- a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java
+++ b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java
@@ -36,6 +36,7 @@ import org.apache.helix.metaclient.api.OpResult;
 import org.apache.helix.metaclient.exception.MetaClientException;
 import org.apache.helix.metaclient.impl.zk.adapter.DataListenerAdapter;
 import org.apache.helix.metaclient.impl.zk.adapter.DirectChildListenerAdapter;
+import org.apache.helix.metaclient.impl.zk.adapter.StateChangeListenerAdapter;
 import org.apache.helix.metaclient.impl.zk.adapter.ZkMetaClientCreateCallbackHandler;
 import org.apache.helix.metaclient.impl.zk.adapter.ZkMetaClientDeleteCallbackHandler;
 import org.apache.helix.metaclient.impl.zk.adapter.ZkMetaClientExistCallbackHandler;
@@ -285,7 +286,8 @@ public class ZkMetaClient<T> implements MetaClientInterface<T>, AutoCloseable {
 
   @Override
   public boolean subscribeStateChanges(ConnectStateChangeListener listener) {
-    return false;
+    _zkClient.subscribeStateChanges(new StateChangeListenerAdapter(listener));
+    return true;
   }
 
   @Override
@@ -310,7 +312,7 @@ public class ZkMetaClient<T> implements MetaClientInterface<T>, AutoCloseable {
 
   @Override
   public void unsubscribeConnectStateChanges(ConnectStateChangeListener listener) {
-
+    _zkClient.subscribeStateChanges(new StateChangeListenerAdapter(listener));
   }
 
   @Override
diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/adapter/StateChangeListenerAdapter.java b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/adapter/StateChangeListenerAdapter.java
new file mode 100644
index 000000000..8ad324bb2
--- /dev/null
+++ b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/adapter/StateChangeListenerAdapter.java
@@ -0,0 +1,76 @@
+package org.apache.helix.metaclient.impl.zk.adapter;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.metaclient.api.ConnectStateChangeListener;
+import org.apache.helix.metaclient.impl.zk.util.ZkMetaClientUtil;
+import org.apache.helix.zookeeper.zkclient.IZkStateListener;
+import org.apache.zookeeper.Watcher;
+
+
+public class StateChangeListenerAdapter implements IZkStateListener {
+  private final ConnectStateChangeListener _listener;
+
+  public StateChangeListenerAdapter(ConnectStateChangeListener listener) {
+    _listener = listener;
+  }
+
+  @Override
+  public void handleStateChanged(Watcher.Event.KeeperState state) throws Exception {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void handleNewSession(String sessionId) throws Exception {
+    // This function will be invoked when connection is established. It is a  no-op for metaclient.
+    // MetaClient will expose this to user as 'handleStateChanged' already covers state change
+    // notification for new connection establishment.
+  }
+
+  @Override
+  public void handleSessionEstablishmentError(Throwable error) throws Exception {
+      _listener.handleConnectionEstablishmentError(error);
+  }
+
+  @Override
+  public void handleStateChanged(Watcher.Event.KeeperState prevState,
+      Watcher.Event.KeeperState curState) throws Exception {
+    _listener.handleConnectStateChanged(
+        ZkMetaClientUtil.translateKeeperStateToMetaClientConnectState(prevState),
+        ZkMetaClientUtil.translateKeeperStateToMetaClientConnectState(curState));
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    StateChangeListenerAdapter that = (StateChangeListenerAdapter) o;
+    return _listener.equals(that._listener);
+  }
+
+  @Override
+  public int hashCode() {
+    return _listener.hashCode();
+  }
+}
diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/util/ZkMetaClientUtil.java b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/util/ZkMetaClientUtil.java
index a733ba4fb..a9bee4cbb 100644
--- a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/util/ZkMetaClientUtil.java
+++ b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/util/ZkMetaClientUtil.java
@@ -42,6 +42,7 @@ import org.apache.helix.zookeeper.zkclient.exception.ZkTimeoutException;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.Op;
+import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.server.EphemeralType;
@@ -52,7 +53,7 @@ public class ZkMetaClientUtil {
   private static final List<ACL> DEFAULT_ACL =
       Collections.unmodifiableList(ZooDefs.Ids.OPEN_ACL_UNSAFE);
 
-  private ZkMetaClientUtil(){
+  private ZkMetaClientUtil() {
   }
 
   /**
@@ -239,6 +240,29 @@ public class ZkMetaClientUtil {
     return new MetaClientException(e);
   }
 
+  public static MetaClientInterface.ConnectState translateKeeperStateToMetaClientConnectState(
+      Watcher.Event.KeeperState keeperState) {
+    if (keeperState == null)
+      return MetaClientInterface.ConnectState.NOT_CONNECTED;
+    switch (keeperState) {
+      case AuthFailed:
+        return MetaClientInterface.ConnectState.AUTH_FAILED;
+      case Closed:
+        return MetaClientInterface.ConnectState.CLOSED_BY_CLIENT;
+      case Disconnected:
+        return MetaClientInterface.ConnectState.DISCONNECTED;
+      case Expired:
+        return MetaClientInterface.ConnectState.EXPIRED;
+      case SaslAuthenticated:
+        return MetaClientInterface.ConnectState.AUTHENTICATED;
+      case SyncConnected:
+      case ConnectedReadOnly:
+        return MetaClientInterface.ConnectState.CONNECTED;
+      default:
+        throw new IllegalArgumentException(keeperState + " is not a supported.");
+    }
+  }
+
   /**
    * This function translate and group Zk exception code to metaclient code.
    * It currently includes all ZK code on 3.6.3.
diff --git a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java
index f23ed0f03..49cbae1f3 100644
--- a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java
+++ b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java
@@ -19,43 +19,29 @@ package org.apache.helix.metaclient.impl.zk;
  * under the License.
  */
 
-import java.io.File;
-import java.io.IOException;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
-import java.util.Set;
-
-import org.apache.helix.metaclient.api.DataUpdater;
-import org.apache.helix.metaclient.api.MetaClientInterface;
-import org.apache.helix.metaclient.exception.MetaClientException;
-import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.helix.metaclient.api.DataUpdater;
-import org.apache.helix.metaclient.api.DirectChildChangeListener;
-import org.apache.helix.metaclient.api.MetaClientInterface;
-import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.lang3.NotImplementedException;
+import org.apache.helix.metaclient.api.ConnectStateChangeListener;
 import org.apache.helix.metaclient.api.DataChangeListener;
+import org.apache.helix.metaclient.api.DataUpdater;
+import org.apache.helix.metaclient.api.DirectChildChangeListener;
+import org.apache.helix.metaclient.api.MetaClientInterface;
 import org.apache.helix.metaclient.api.Op;
 import org.apache.helix.metaclient.api.OpResult;
 import org.apache.helix.metaclient.exception.MetaClientException;
 import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig;
-import org.apache.helix.zookeeper.zkclient.IDefaultNameSpace;
-import org.apache.helix.zookeeper.zkclient.ZkServer;
 import org.apache.zookeeper.KeeperException;
 import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 import static org.apache.helix.metaclient.api.MetaClientInterface.EntryMode.CONTAINER;
@@ -334,6 +320,35 @@ public class TestZkMetaClient extends ZkMetaClientTestBase{
     }
   }
 
+  @Test
+  public void testConnectStateChangeListener() throws Exception {
+    final String basePath = "/TestZkMetaClient_testConnectionStateChangeListener";
+    try (ZkMetaClient<String> zkMetaClient = createZkMetaClient()) {
+      CountDownLatch countDownLatch = new CountDownLatch(1);
+      final MetaClientInterface.ConnectState[] connectState =
+          new MetaClientInterface.ConnectState[2];
+      ConnectStateChangeListener listener = new ConnectStateChangeListener() {
+        @Override
+        public void handleConnectStateChanged(MetaClientInterface.ConnectState prevState,
+            MetaClientInterface.ConnectState currentState) throws Exception {
+          connectState[0] = prevState;
+          connectState[1] = currentState;
+          countDownLatch.countDown();
+        }
+
+        @Override
+        public void handleConnectionEstablishmentError(Throwable error) throws Exception {
+
+        }
+      };
+      Assert.assertTrue(zkMetaClient.subscribeStateChanges(listener));
+      zkMetaClient.connect();
+      countDownLatch.await(5000, TimeUnit.SECONDS);
+      Assert.assertEquals(connectState[0], MetaClientInterface.ConnectState.NOT_CONNECTED);
+      Assert.assertEquals(connectState[1], MetaClientInterface.ConnectState.CONNECTED);
+    }
+  }
+
   /**
    * Transactional op calls zk.multi() with a set of ops (operations)
    * and the return values are converted into metaclient opResults.
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/IZkStateListener.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/IZkStateListener.java
index 5970e623d..f2c4190de 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/IZkStateListener.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/IZkStateListener.java
@@ -58,4 +58,8 @@ public interface IZkStateListener {
    *             On any error.
    */
   void handleSessionEstablishmentError(final Throwable error) throws Exception;
+
+  default void handleStateChanged(KeeperState prevState, KeeperState curState) throws Exception {
+    handleStateChanged(curState);
+  }
 }
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
index 31fe4e97a..51904ede6 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
@@ -1486,12 +1486,13 @@ public class ZkClient implements Watcher {
 
   protected void processStateChanged(WatchedEvent event) {
     LOG.info("zkclient {}, zookeeper state changed ( {} )", _uid, event.getState());
+    KeeperState prevState = _currentState;
     setCurrentState(event.getState());
     if (getShutdownTrigger()) {
       return;
     }
 
-    fireStateChangedEvent(event.getState());
+    fireStateChangedEvent(prevState, event.getState());
 
     /*
      *  Note, the intention is that only the ZkClient managing the session would do auto reconnect
@@ -1660,15 +1661,15 @@ public class ZkClient implements Watcher {
     }
   }
 
-  protected void fireStateChangedEvent(final KeeperState state) {
+  protected void fireStateChangedEvent(final KeeperState prevState, final KeeperState curState) {
     final String sessionId = getHexSessionId();
     for (final IZkStateListener stateListener : _stateListener) {
-      final String description = "State changed to " + state + " sent to " + stateListener;
+      final String description = "State changed to " + curState + " sent to " + stateListener;
       _eventThread.send(new ZkEventThread.ZkEvent(description, sessionId) {
 
         @Override
         public void run() throws Exception {
-          stateListener.handleStateChanged(state);
+          stateListener.handleStateChanged(prevState, curState);
         }
       });
     }