You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2023/08/28 20:58:18 UTC
[helix] 12/21: ZkClient - only register one time watcher for read data when not using persist listener. (#2555)
This is an automated email from the ASF dual-hosted git repository.
jxue pushed a commit to branch metaclient
in repository https://gitbox.apache.org/repos/asf/helix.git
commit def0bb6d6b22789c0c40716a9233a3258082eb76
Author: xyuanlu <xy...@gmail.com>
AuthorDate: Thu Jul 13 10:25:10 2023 -0700
ZkClient - only register one time watcher for read data when not using persist listener. (#2555)
---------
Co-authored-by: Xiaoyuan Lu <xi...@xialu-mn2.linkedin.biz>
---
.../helix/metaclient/impl/zk/TestZkMetaClient.java | 22 +++++++
.../apache/helix/zookeeper/zkclient/ZkClient.java | 2 +-
.../zkclient/TestZkClientPersistWatcher.java | 75 +++++++++++++---------
3 files changed, 67 insertions(+), 32 deletions(-)
diff --git a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java
index 8aca150b0..19f21977b 100644
--- a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java
+++ b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java
@@ -504,6 +504,28 @@ public class TestZkMetaClient extends ZkMetaClientTestBase{
}
}
+ @Test
+ public void testChangeListener() throws Exception {
+ final String basePath = "/TestZkMetaClient_ChangeListener";
+ final int count = 100;
+ try (ZkMetaClient<String> zkMetaClient = createZkMetaClient()) {
+ zkMetaClient.connect();
+ DataChangeListener listener = new DataChangeListener() {
+
+ @Override
+ public void handleDataChange(String key, Object data, ChangeType changeType)
+ throws Exception {
+ }
+ };
+ zkMetaClient.subscribeDataChange(basePath, listener, false);
+ zkMetaClient.create(basePath, "");
+ zkMetaClient.get(basePath);
+ zkMetaClient.exists(basePath);
+ zkMetaClient.getDataAndStat(basePath);
+ zkMetaClient.getDirectChildrenKeys(basePath);
+ }
+ }
+
/**
* Transactional op calls zk.multi() with a set of ops (operations)
* and the return values are converted into metaclient opResults.
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
index 064f6b494..2a06158d0 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
@@ -2232,7 +2232,7 @@ public class ZkClient implements Watcher {
@SuppressWarnings("unchecked")
public <T extends Object> T readData(String path, Stat stat) {
- return (T) readData(path, stat, hasChildOrDataListeners(path));
+ return (T) readData(path, stat, (!_usePersistWatcher) && hasChildOrDataListeners(path));
}
@SuppressWarnings("unchecked")
diff --git a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/zkclient/TestZkClientPersistWatcher.java b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/zkclient/TestZkClientPersistWatcher.java
index 76f5352c9..c54bca1ef 100644
--- a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/zkclient/TestZkClientPersistWatcher.java
+++ b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/zkclient/TestZkClientPersistWatcher.java
@@ -41,8 +41,7 @@ public class TestZkClientPersistWatcher extends ZkTestBase {
void testZkClientDataChange() throws Exception {
org.apache.helix.zookeeper.impl.client.ZkClient.Builder builder =
new org.apache.helix.zookeeper.impl.client.ZkClient.Builder();
- builder.setZkServer(ZkTestBase.ZK_ADDR).setMonitorRootPathOnly(false)
- .setUsePersistWatcher(true);
+ builder.setZkServer(ZkTestBase.ZK_ADDR).setMonitorRootPathOnly(false).setUsePersistWatcher(true);
org.apache.helix.zookeeper.impl.client.ZkClient zkClient = builder.build();
zkClient.setZkSerializer(new BasicZkSerializer(new SerializableSerializer()));
int count = 1000;
@@ -63,8 +62,8 @@ public class TestZkClientPersistWatcher extends ZkTestBase {
zkClient.subscribeDataChanges(path, dataListener);
zkClient.create(path, "datat", CreateMode.PERSISTENT);
- for(int i=0; i<count; ++i) {
- zkClient.writeData(path, ("datat"+i), -1);
+ for (int i = 0; i < count; ++i) {
+ zkClient.writeData(path, ("datat" + i), -1);
}
Assert.assertTrue(countDownLatch1.await(15000, TimeUnit.MILLISECONDS));
@@ -75,8 +74,7 @@ public class TestZkClientPersistWatcher extends ZkTestBase {
void testZkClientChildChange() throws Exception {
org.apache.helix.zookeeper.impl.client.ZkClient.Builder builder =
new org.apache.helix.zookeeper.impl.client.ZkClient.Builder();
- builder.setZkServer(ZkTestBase.ZK_ADDR).setMonitorRootPathOnly(false)
- .setUsePersistWatcher(true);
+ builder.setZkServer(ZkTestBase.ZK_ADDR).setMonitorRootPathOnly(false).setUsePersistWatcher(true);
org.apache.helix.zookeeper.impl.client.ZkClient zkClient = builder.build();
zkClient.setZkSerializer(new BasicZkSerializer(new SerializableSerializer()));
int count = 100;
@@ -86,16 +84,14 @@ public class TestZkClientPersistWatcher extends ZkTestBase {
String path = "/testZkClientChildChange";
IZkChildListener childListener = new IZkChildListener() {
@Override
- public void handleChildChange(String parentPath, List<String> currentChilds)
- throws Exception {
+ public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
countDownLatch1.countDown();
- event_count[0]++ ;
+ event_count[0]++;
}
};
IZkChildListener childListener2 = new IZkChildListener() {
@Override
- public void handleChildChange(String parentPath, List<String> currentChilds)
- throws Exception {
+ public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
countDownLatch2.countDown();
event_count[0]++;
}
@@ -108,6 +104,7 @@ public class TestZkClientPersistWatcher extends ZkTestBase {
}
Assert.assertTrue(countDownLatch1.await(15000, TimeUnit.MILLISECONDS));
Assert.assertTrue(countDownLatch2.await(15000, TimeUnit.MILLISECONDS));
+ zkClient.deleteRecursively(path);
zkClient.close();
}
@@ -115,8 +112,7 @@ public class TestZkClientPersistWatcher extends ZkTestBase {
void testZkClientPersistRecursiveChange() throws Exception {
org.apache.helix.zookeeper.impl.client.ZkClient.Builder builder =
new org.apache.helix.zookeeper.impl.client.ZkClient.Builder();
- builder.setZkServer(ZkTestBase.ZK_ADDR).setMonitorRootPathOnly(false)
- .setUsePersistWatcher(true);
+ builder.setZkServer(ZkTestBase.ZK_ADDR).setMonitorRootPathOnly(false).setUsePersistWatcher(true);
org.apache.helix.zookeeper.impl.client.ZkClient zkClient = builder.build();
zkClient.setZkSerializer(new BasicZkSerializer(new SerializableSerializer()));
int count = 100;
@@ -124,39 +120,37 @@ public class TestZkClientPersistWatcher extends ZkTestBase {
final AtomicInteger[] event_count2 = {new AtomicInteger(0)};
// for each iteration, we will edit a node, create a child, create a grand child, and
// delete child. Expect 4 event per iteration. -> total event should be count*4
- CountDownLatch countDownLatch1 = new CountDownLatch(count*4);
+ CountDownLatch countDownLatch1 = new CountDownLatch(count * 4);
CountDownLatch countDownLatch2 = new CountDownLatch(count);
String path = "/testZkClientPersistRecursiveChange";
RecursivePersistListener rcListener = new RecursivePersistListener() {
@Override
- public void handleZNodeChange(String dataPath, Watcher.Event.EventType eventType)
- throws Exception {
+ public void handleZNodeChange(String dataPath, Watcher.Event.EventType eventType) throws Exception {
countDownLatch1.countDown();
- event_count[0].incrementAndGet() ;
+ event_count[0].incrementAndGet();
}
};
zkClient.create(path, "datat", CreateMode.PERSISTENT);
zkClient.subscribePersistRecursiveListener(path, rcListener);
- for (int i=0; i<count; ++i) {
+ for (int i = 0; i < count; ++i) {
zkClient.writeData(path, "data7" + i, -1);
- zkClient.create(path+"/c1_" +i , "datat", CreateMode.PERSISTENT);
- zkClient.create(path+"/c1_" +i + "/c2", "datat", CreateMode.PERSISTENT);
- zkClient.delete(path+"/c1_" +i + "/c2");
+ zkClient.create(path + "/c1_" + i, "datat", CreateMode.PERSISTENT);
+ zkClient.create(path + "/c1_" + i + "/c2", "datat", CreateMode.PERSISTENT);
+ zkClient.delete(path + "/c1_" + i + "/c2");
}
Assert.assertTrue(countDownLatch1.await(50000000, TimeUnit.MILLISECONDS));
// subscribe a persist child watch, it should throw exception
IZkChildListener childListener2 = new IZkChildListener() {
@Override
- public void handleChildChange(String parentPath, List<String> currentChilds)
- throws Exception {
+ public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
countDownLatch2.countDown();
event_count2[0].incrementAndGet();
}
};
try {
zkClient.subscribeChildChanges(path, childListener2, false);
- } catch ( Exception ex) {
+ } catch (Exception ex) {
Assert.assertEquals(ex.getClass().getName(), "java.lang.UnsupportedOperationException");
}
@@ -164,14 +158,15 @@ public class TestZkClientPersistWatcher extends ZkTestBase {
zkClient.unsubscribePersistRecursiveListener(path, rcListener);
zkClient.subscribeChildChanges(path, childListener2, false);
// we should only get 100 event since only 100 direct child change.
- for (int i=0; i<count; ++i) {
+ for (int i = 0; i < count; ++i) {
zkClient.writeData(path, "data7" + i, -1);
- zkClient.create(path+"/c2_" +i , "datat", CreateMode.PERSISTENT);
- zkClient.create(path+"/c2_" +i + "/c3", "datat", CreateMode.PERSISTENT);
- zkClient.delete(path+"/c2_" +i + "/c3");
+ zkClient.create(path + "/c2_" + i, "datat", CreateMode.PERSISTENT);
+ zkClient.create(path + "/c2_" + i + "/c3", "datat", CreateMode.PERSISTENT);
+ zkClient.delete(path + "/c2_" + i + "/c3");
}
Assert.assertTrue(countDownLatch2.await(50000000, TimeUnit.MILLISECONDS));
+ zkClient.deleteRecursively(path);
zkClient.close();
}
@@ -179,8 +174,7 @@ public class TestZkClientPersistWatcher extends ZkTestBase {
void testSubscribeOneTimeChangeWhenUsingPersistWatcher() {
org.apache.helix.zookeeper.impl.client.ZkClient.Builder builder =
new org.apache.helix.zookeeper.impl.client.ZkClient.Builder();
- builder.setZkServer(ZkTestBase.ZK_ADDR).setMonitorRootPathOnly(false)
- .setUsePersistWatcher(true);
+ builder.setZkServer(ZkTestBase.ZK_ADDR).setMonitorRootPathOnly(false).setUsePersistWatcher(true);
ZkClient zkClient = builder.build();
zkClient.setZkSerializer(new BasicZkSerializer(new SerializableSerializer()));
@@ -194,7 +188,7 @@ public class TestZkClientPersistWatcher extends ZkTestBase {
}
try {
- zkClient.readData(path, null, true);
+ zkClient.readData(path, null, true);
Assert.fail("Should throw exception when subscribe one time listener");
} catch (Exception e) {
Assert.assertEquals(e.getClass().getName(), "java.lang.IllegalArgumentException");
@@ -206,5 +200,24 @@ public class TestZkClientPersistWatcher extends ZkTestBase {
} catch (Exception e) {
Assert.assertEquals(e.getClass().getName(), "java.lang.IllegalArgumentException");
}
+ zkClient.delete(path);
+ zkClient.close();
+ }
+
+ @Test
+ void testCrudOperationWithResubscribe() {
+ org.apache.helix.zookeeper.impl.client.ZkClient.Builder builder =
+ new org.apache.helix.zookeeper.impl.client.ZkClient.Builder();
+ builder.setZkServer(ZkTestBase.ZK_ADDR).setMonitorRootPathOnly(false).setUsePersistWatcher(false);
+ ZkClient zkClient = builder.build();
+ zkClient.setZkSerializer(new BasicZkSerializer(new SerializableSerializer()));
+
+ String path = "/testCrudOperationWithResubscribe";
+ zkClient.create(path, "datat", CreateMode.PERSISTENT);
+ zkClient.exists(path, true);
+ zkClient.readData(path, null, true);
+ zkClient.getChildren(path, true);
+ zkClient.delete(path);
+ zkClient.close();
}
}
\ No newline at end of file