You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2023/08/28 20:58:18 UTC

[helix] 12/21: ZkClient - only register one time watcher for read data when not using persist listener. (#2555)

This is an automated email from the ASF dual-hosted git repository.

jxue pushed a commit to branch metaclient
in repository https://gitbox.apache.org/repos/asf/helix.git

commit def0bb6d6b22789c0c40716a9233a3258082eb76
Author: xyuanlu <xy...@gmail.com>
AuthorDate: Thu Jul 13 10:25:10 2023 -0700

    ZkClient - only register one time watcher for read data when not using persist listener. (#2555)
    
    
    
    ---------
    
    Co-authored-by: Xiaoyuan Lu <xi...@xialu-mn2.linkedin.biz>
---
 .../helix/metaclient/impl/zk/TestZkMetaClient.java | 22 +++++++
 .../apache/helix/zookeeper/zkclient/ZkClient.java  |  2 +-
 .../zkclient/TestZkClientPersistWatcher.java       | 75 +++++++++++++---------
 3 files changed, 67 insertions(+), 32 deletions(-)

diff --git a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java
index 8aca150b0..19f21977b 100644
--- a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java
+++ b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java
@@ -504,6 +504,28 @@ public class TestZkMetaClient extends ZkMetaClientTestBase{
     }
   }
 
+  @Test
+  public void testChangeListener() throws Exception {
+    final String basePath = "/TestZkMetaClient_ChangeListener";
+    final int count = 100;
+    try (ZkMetaClient<String> zkMetaClient = createZkMetaClient()) {
+      zkMetaClient.connect();
+      DataChangeListener listener = new DataChangeListener() {
+
+        @Override
+        public void handleDataChange(String key, Object data, ChangeType changeType)
+            throws Exception {
+        }
+      };
+      zkMetaClient.subscribeDataChange(basePath, listener, false);
+      zkMetaClient.create(basePath, "");
+      zkMetaClient.get(basePath);
+      zkMetaClient.exists(basePath);
+      zkMetaClient.getDataAndStat(basePath);
+      zkMetaClient.getDirectChildrenKeys(basePath);
+    }
+  }
+
   /**
    * Transactional op calls zk.multi() with a set of ops (operations)
    * and the return values are converted into metaclient opResults.
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
index 064f6b494..2a06158d0 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
@@ -2232,7 +2232,7 @@ public class ZkClient implements Watcher {
 
   @SuppressWarnings("unchecked")
   public <T extends Object> T readData(String path, Stat stat) {
-    return (T) readData(path, stat, hasChildOrDataListeners(path));
+    return (T) readData(path, stat, (!_usePersistWatcher) && hasChildOrDataListeners(path));
   }
 
   @SuppressWarnings("unchecked")
diff --git a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/zkclient/TestZkClientPersistWatcher.java b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/zkclient/TestZkClientPersistWatcher.java
index 76f5352c9..c54bca1ef 100644
--- a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/zkclient/TestZkClientPersistWatcher.java
+++ b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/zkclient/TestZkClientPersistWatcher.java
@@ -41,8 +41,7 @@ public class TestZkClientPersistWatcher extends ZkTestBase {
   void testZkClientDataChange() throws Exception {
     org.apache.helix.zookeeper.impl.client.ZkClient.Builder builder =
         new org.apache.helix.zookeeper.impl.client.ZkClient.Builder();
-    builder.setZkServer(ZkTestBase.ZK_ADDR).setMonitorRootPathOnly(false)
-        .setUsePersistWatcher(true);
+    builder.setZkServer(ZkTestBase.ZK_ADDR).setMonitorRootPathOnly(false).setUsePersistWatcher(true);
     org.apache.helix.zookeeper.impl.client.ZkClient zkClient = builder.build();
     zkClient.setZkSerializer(new BasicZkSerializer(new SerializableSerializer()));
     int count = 1000;
@@ -63,8 +62,8 @@ public class TestZkClientPersistWatcher extends ZkTestBase {
 
     zkClient.subscribeDataChanges(path, dataListener);
     zkClient.create(path, "datat", CreateMode.PERSISTENT);
-    for(int i=0; i<count; ++i) {
-      zkClient.writeData(path, ("datat"+i), -1);
+    for (int i = 0; i < count; ++i) {
+      zkClient.writeData(path, ("datat" + i), -1);
     }
 
     Assert.assertTrue(countDownLatch1.await(15000, TimeUnit.MILLISECONDS));
@@ -75,8 +74,7 @@ public class TestZkClientPersistWatcher extends ZkTestBase {
   void testZkClientChildChange() throws Exception {
     org.apache.helix.zookeeper.impl.client.ZkClient.Builder builder =
         new org.apache.helix.zookeeper.impl.client.ZkClient.Builder();
-    builder.setZkServer(ZkTestBase.ZK_ADDR).setMonitorRootPathOnly(false)
-        .setUsePersistWatcher(true);
+    builder.setZkServer(ZkTestBase.ZK_ADDR).setMonitorRootPathOnly(false).setUsePersistWatcher(true);
     org.apache.helix.zookeeper.impl.client.ZkClient zkClient = builder.build();
     zkClient.setZkSerializer(new BasicZkSerializer(new SerializableSerializer()));
     int count = 100;
@@ -86,16 +84,14 @@ public class TestZkClientPersistWatcher extends ZkTestBase {
     String path = "/testZkClientChildChange";
     IZkChildListener childListener = new IZkChildListener() {
       @Override
-      public void handleChildChange(String parentPath, List<String> currentChilds)
-          throws Exception {
+      public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
         countDownLatch1.countDown();
-        event_count[0]++ ;
+        event_count[0]++;
       }
     };
     IZkChildListener childListener2 = new IZkChildListener() {
       @Override
-      public void handleChildChange(String parentPath, List<String> currentChilds)
-          throws Exception {
+      public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
         countDownLatch2.countDown();
         event_count[0]++;
       }
@@ -108,6 +104,7 @@ public class TestZkClientPersistWatcher extends ZkTestBase {
     }
     Assert.assertTrue(countDownLatch1.await(15000, TimeUnit.MILLISECONDS));
     Assert.assertTrue(countDownLatch2.await(15000, TimeUnit.MILLISECONDS));
+    zkClient.deleteRecursively(path);
     zkClient.close();
   }
 
@@ -115,8 +112,7 @@ public class TestZkClientPersistWatcher extends ZkTestBase {
   void testZkClientPersistRecursiveChange() throws Exception {
     org.apache.helix.zookeeper.impl.client.ZkClient.Builder builder =
         new org.apache.helix.zookeeper.impl.client.ZkClient.Builder();
-    builder.setZkServer(ZkTestBase.ZK_ADDR).setMonitorRootPathOnly(false)
-        .setUsePersistWatcher(true);
+    builder.setZkServer(ZkTestBase.ZK_ADDR).setMonitorRootPathOnly(false).setUsePersistWatcher(true);
     org.apache.helix.zookeeper.impl.client.ZkClient zkClient = builder.build();
     zkClient.setZkSerializer(new BasicZkSerializer(new SerializableSerializer()));
     int count = 100;
@@ -124,39 +120,37 @@ public class TestZkClientPersistWatcher extends ZkTestBase {
     final AtomicInteger[] event_count2 = {new AtomicInteger(0)};
     // for each iteration, we will edit a node, create a child, create a grand child, and
     // delete child. Expect 4 event per iteration. -> total event should be count*4
-    CountDownLatch countDownLatch1 = new CountDownLatch(count*4);
+    CountDownLatch countDownLatch1 = new CountDownLatch(count * 4);
     CountDownLatch countDownLatch2 = new CountDownLatch(count);
     String path = "/testZkClientPersistRecursiveChange";
     RecursivePersistListener rcListener = new RecursivePersistListener() {
       @Override
-      public void handleZNodeChange(String dataPath, Watcher.Event.EventType eventType)
-          throws Exception {
+      public void handleZNodeChange(String dataPath, Watcher.Event.EventType eventType) throws Exception {
         countDownLatch1.countDown();
-        event_count[0].incrementAndGet() ;
+        event_count[0].incrementAndGet();
       }
     };
     zkClient.create(path, "datat", CreateMode.PERSISTENT);
     zkClient.subscribePersistRecursiveListener(path, rcListener);
-    for (int i=0; i<count; ++i) {
+    for (int i = 0; i < count; ++i) {
       zkClient.writeData(path, "data7" + i, -1);
-      zkClient.create(path+"/c1_" +i , "datat", CreateMode.PERSISTENT);
-      zkClient.create(path+"/c1_" +i + "/c2", "datat", CreateMode.PERSISTENT);
-      zkClient.delete(path+"/c1_" +i + "/c2");
+      zkClient.create(path + "/c1_" + i, "datat", CreateMode.PERSISTENT);
+      zkClient.create(path + "/c1_" + i + "/c2", "datat", CreateMode.PERSISTENT);
+      zkClient.delete(path + "/c1_" + i + "/c2");
     }
     Assert.assertTrue(countDownLatch1.await(50000000, TimeUnit.MILLISECONDS));
 
     // subscribe a persist child watch, it should throw exception
     IZkChildListener childListener2 = new IZkChildListener() {
       @Override
-      public void handleChildChange(String parentPath, List<String> currentChilds)
-          throws Exception {
+      public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
         countDownLatch2.countDown();
         event_count2[0].incrementAndGet();
       }
     };
     try {
       zkClient.subscribeChildChanges(path, childListener2, false);
-    } catch ( Exception ex) {
+    } catch (Exception ex) {
       Assert.assertEquals(ex.getClass().getName(), "java.lang.UnsupportedOperationException");
     }
 
@@ -164,14 +158,15 @@ public class TestZkClientPersistWatcher extends ZkTestBase {
     zkClient.unsubscribePersistRecursiveListener(path, rcListener);
     zkClient.subscribeChildChanges(path, childListener2, false);
     // we should only get 100 event since only 100 direct child change.
-    for (int i=0; i<count; ++i) {
+    for (int i = 0; i < count; ++i) {
       zkClient.writeData(path, "data7" + i, -1);
-      zkClient.create(path+"/c2_" +i , "datat", CreateMode.PERSISTENT);
-      zkClient.create(path+"/c2_" +i + "/c3", "datat", CreateMode.PERSISTENT);
-      zkClient.delete(path+"/c2_" +i + "/c3");
+      zkClient.create(path + "/c2_" + i, "datat", CreateMode.PERSISTENT);
+      zkClient.create(path + "/c2_" + i + "/c3", "datat", CreateMode.PERSISTENT);
+      zkClient.delete(path + "/c2_" + i + "/c3");
     }
     Assert.assertTrue(countDownLatch2.await(50000000, TimeUnit.MILLISECONDS));
 
+    zkClient.deleteRecursively(path);
     zkClient.close();
   }
 
@@ -179,8 +174,7 @@ public class TestZkClientPersistWatcher extends ZkTestBase {
   void testSubscribeOneTimeChangeWhenUsingPersistWatcher() {
     org.apache.helix.zookeeper.impl.client.ZkClient.Builder builder =
         new org.apache.helix.zookeeper.impl.client.ZkClient.Builder();
-    builder.setZkServer(ZkTestBase.ZK_ADDR).setMonitorRootPathOnly(false)
-        .setUsePersistWatcher(true);
+    builder.setZkServer(ZkTestBase.ZK_ADDR).setMonitorRootPathOnly(false).setUsePersistWatcher(true);
     ZkClient zkClient = builder.build();
     zkClient.setZkSerializer(new BasicZkSerializer(new SerializableSerializer()));
 
@@ -194,7 +188,7 @@ public class TestZkClientPersistWatcher extends ZkTestBase {
     }
 
     try {
-      zkClient.readData(path, null,  true);
+      zkClient.readData(path, null, true);
       Assert.fail("Should throw exception when subscribe one time listener");
     } catch (Exception e) {
       Assert.assertEquals(e.getClass().getName(), "java.lang.IllegalArgumentException");
@@ -206,5 +200,24 @@ public class TestZkClientPersistWatcher extends ZkTestBase {
     } catch (Exception e) {
       Assert.assertEquals(e.getClass().getName(), "java.lang.IllegalArgumentException");
     }
+    zkClient.delete(path);
+    zkClient.close();
+  }
+
+  @Test
+  void testCrudOperationWithResubscribe() {
+    org.apache.helix.zookeeper.impl.client.ZkClient.Builder builder =
+        new org.apache.helix.zookeeper.impl.client.ZkClient.Builder();
+    builder.setZkServer(ZkTestBase.ZK_ADDR).setMonitorRootPathOnly(false).setUsePersistWatcher(false);
+    ZkClient zkClient = builder.build();
+    zkClient.setZkSerializer(new BasicZkSerializer(new SerializableSerializer()));
+
+    String path = "/testCrudOperationWithResubscribe";
+    zkClient.create(path, "datat", CreateMode.PERSISTENT);
+    zkClient.exists(path, true);
+    zkClient.readData(path, null, true);
+    zkClient.getChildren(path, true);
+    zkClient.delete(path);
+    zkClient.close();
   }
 }
\ No newline at end of file