You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by xy...@apache.org on 2023/07/20 17:05:42 UTC
[helix] branch metaclient updated: Multithreading stress test lattice - CRUD puppies and Listener Tests (#2548)
This is an automated email from the ASF dual-hosted git repository.
xyuanlu pushed a commit to branch metaclient
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/metaclient by this push:
new 9f36c1062 Multithreading stress test lattice - CRUD puppies and Listener Tests (#2548)
9f36c1062 is described below
commit 9f36c1062743664175118b3b4698aa73600b09d4
Author: Marcos Rico Peng <55...@users.noreply.github.com>
AuthorDate: Thu Jul 20 13:05:37 2023 -0400
Multithreading stress test lattice - CRUD puppies and Listener Tests (#2548)
---------
Co-authored-by: mapeng <ma...@linkedin.com>
---
.../helix/metaclient/impl/zk/ZkMetaClient.java | 6 +-
.../zk/TestMultiThreadStressTest/CreatePuppy.java | 78 +++++
.../zk/TestMultiThreadStressTest/DeletePuppy.java | 67 ++++
.../zk/TestMultiThreadStressTest/GetPuppy.java | 68 ++++
.../zk/TestMultiThreadStressTest/SetPuppy.java | 71 +++++
.../TestMultiThreadStressZKClient.java | 352 +++++++++++++++++++++
.../zk/TestMultiThreadStressTest/UpdatePuppy.java | 74 +++++
.../helix/metaclient/puppy/AbstractPuppy.java | 7 +-
8 files changed, 717 insertions(+), 6 deletions(-)
diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java
index 7f68ec9ca..691f31cde 100644
--- a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java
+++ b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java
@@ -101,9 +101,7 @@ public class ZkMetaClient<T> implements MetaClientInterface<T>, AutoCloseable {
try {
create(key, data, EntryMode.PERSISTENT);
} catch (ZkException e) {
- throw ZkMetaClientUtil.translateZkExceptionToMetaclientException(e);
- } catch (Exception e) {
- throw new MetaClientException(e);
+ throw translateZkExceptionToMetaclientException(e);
}
}
@@ -113,7 +111,7 @@ public class ZkMetaClient<T> implements MetaClientInterface<T>, AutoCloseable {
try {
_zkClient.create(key, data, ZkMetaClientUtil.convertMetaClientMode(mode));
} catch (ZkException e) {
- throw ZkMetaClientUtil.translateZkExceptionToMetaclientException(e);
+ throw translateZkExceptionToMetaclientException(e);
} catch (KeeperException e) {
throw new MetaClientException(e);
}
diff --git a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestMultiThreadStressTest/CreatePuppy.java b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestMultiThreadStressTest/CreatePuppy.java
new file mode 100644
index 000000000..3e28df06b
--- /dev/null
+++ b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestMultiThreadStressTest/CreatePuppy.java
@@ -0,0 +1,78 @@
+package org.apache.helix.metaclient.impl.zk.TestMultiThreadStressTest;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.metaclient.api.MetaClientInterface;
+import org.apache.helix.metaclient.exception.MetaClientNodeExistsException;
+import org.apache.helix.metaclient.puppy.AbstractPuppy;
+import org.apache.helix.metaclient.puppy.PuppySpec;
+
+import java.util.Random;
+
+public class CreatePuppy extends AbstractPuppy {
+
+ private final Random _random;
+ private final String _parentPath = "/test";
+
+ public CreatePuppy(MetaClientInterface<String> metaclient, PuppySpec puppySpec) {
+ super(metaclient, puppySpec);
+ _random = new Random();
+ }
+
+ @Override
+ protected void bark() {
+ // Implement the chaos logic for creating nodes
+ int randomNumber = _random.nextInt(_puppySpec.getNumberDiffPaths());
+ if (shouldIntroduceError()) {
+ try {
+ // Simulate an error by creating an invalid path
+ _metaclient.create("invalid", "test");
+ } catch (IllegalArgumentException e) { // Catch invalid exception
+ System.out.println(Thread.currentThread().getName() + " tried to create an invalid path" + " at time: " + System.currentTimeMillis());
+ // Expected exception
+ }
+ } else {
+ // Normal behavior - create a new node
+ try {
+ System.out.println(Thread.currentThread().getName() + " is attempting to create node: " + randomNumber + " at time: " + System.currentTimeMillis());
+ _metaclient.create(_parentPath + "/" + randomNumber,"test");
+ System.out.println(Thread.currentThread().getName() + " successfully created node " + randomNumber + " at time: " + System.currentTimeMillis());
+ _eventChangeCounterMap.put(String.valueOf(randomNumber), _eventChangeCounterMap.getOrDefault(String.valueOf(randomNumber), 0) + 1);
+ } catch (MetaClientNodeExistsException e) {
+ // Expected exception
+ System.out.println(Thread.currentThread().getName() + " failed to create node " + randomNumber + " at time: " + System.currentTimeMillis() + ", it already exists");
+ }
+ }
+ }
+
+ @Override
+ protected void cleanup() {
+ // Implement the recovery logic by deleting the created documents
+ _metaclient.recursiveDelete(_parentPath);
+ }
+
+ private boolean shouldIntroduceError() {
+ float randomValue = _random.nextFloat();
+ return randomValue < _puppySpec.getErrorRate();
+ }
+}
+
+
+
diff --git a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestMultiThreadStressTest/DeletePuppy.java b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestMultiThreadStressTest/DeletePuppy.java
new file mode 100644
index 000000000..e0e1b7b5c
--- /dev/null
+++ b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestMultiThreadStressTest/DeletePuppy.java
@@ -0,0 +1,67 @@
+package org.apache.helix.metaclient.impl.zk.TestMultiThreadStressTest;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.metaclient.api.MetaClientInterface;
+import org.apache.helix.metaclient.puppy.AbstractPuppy;
+import org.apache.helix.metaclient.puppy.PuppySpec;
+
+import java.util.Random;
+
+public class DeletePuppy extends AbstractPuppy {
+
+ private final Random _random;
+ private final String _parentPath = "/test";
+
+ public DeletePuppy(MetaClientInterface<String> metaclient, PuppySpec puppySpec) {
+ super(metaclient, puppySpec);
+ _random = new Random();
+ }
+
+ @Override
+ protected void bark() {
+ int randomNumber = _random.nextInt(_puppySpec.getNumberDiffPaths());
+ if (shouldIntroduceError()) {
+ try {
+ _metaclient.delete("invalid");
+ _unhandledErrorCounter++;
+ } catch (IllegalArgumentException e) {
+ System.out.println(Thread.currentThread().getName() + " intentionally deleted an invalid path" + " at time: " + System.currentTimeMillis() );
+ }
+ } else {
+ System.out.println(Thread.currentThread().getName() + " is attempting to delete node: " + randomNumber + " at time: " + System.currentTimeMillis());
+ if (_metaclient.delete(_parentPath + "/" + randomNumber)) {
+ System.out.println(Thread.currentThread().getName() + " successfully deleted node " + randomNumber + " at time: " + System.currentTimeMillis());
+ _eventChangeCounterMap.put(String.valueOf(randomNumber), _eventChangeCounterMap.getOrDefault(String.valueOf(randomNumber), 0) + 1);
+ } else {
+ System.out.println(Thread.currentThread().getName() + " failed to delete node " + randomNumber + " at time: " + System.currentTimeMillis() + ", it does not exist");
+ }
+ }
+ }
+
+ @Override
+ protected void cleanup() {
+ _metaclient.recursiveDelete(_parentPath);
+ }
+
+ private boolean shouldIntroduceError() {
+ return _random.nextFloat() < _puppySpec.getErrorRate();
+ }
+}
\ No newline at end of file
diff --git a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestMultiThreadStressTest/GetPuppy.java b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestMultiThreadStressTest/GetPuppy.java
new file mode 100644
index 000000000..fe24b2bd3
--- /dev/null
+++ b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestMultiThreadStressTest/GetPuppy.java
@@ -0,0 +1,68 @@
+package org.apache.helix.metaclient.impl.zk.TestMultiThreadStressTest;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.metaclient.api.MetaClientInterface;
+import org.apache.helix.metaclient.puppy.AbstractPuppy;
+import org.apache.helix.metaclient.puppy.PuppySpec;
+
+import java.util.Objects;
+import java.util.Random;
+
+public class GetPuppy extends AbstractPuppy {
+
+ private final Random _random;
+ private final String _parentPath = "/test";
+
+ public GetPuppy(MetaClientInterface<String> metaclient, PuppySpec puppySpec) {
+ super(metaclient, puppySpec);
+ _random = new Random();
+ }
+
+ @Override
+ protected void bark() {
+ int randomNumber = _random.nextInt(_puppySpec.getNumberDiffPaths());
+ if (shouldIntroduceError()) {
+ try {
+ _metaclient.get("invalid");
+ _unhandledErrorCounter++;
+ } catch (IllegalArgumentException e) {
+ System.out.println(Thread.currentThread().getName() + " intentionally tried to read an invalid path" + " at time: " + System.currentTimeMillis());
+ }
+ } else {
+ System.out.println(Thread.currentThread().getName() + " is attempting to read node: " + randomNumber + " at time: " + System.currentTimeMillis());
+ String nodeValue = _metaclient.get(_parentPath + "/" + randomNumber);
+ if (Objects.equals(nodeValue, null)) {
+ System.out.println(Thread.currentThread().getName() + " failed to read node " + randomNumber + " at time: " + System.currentTimeMillis() + ", it does not exist");
+ } else {
+ System.out.println(Thread.currentThread().getName() + " successfully read node " + randomNumber + " at time: " + System.currentTimeMillis());
+ }
+ }
+ }
+
+ @Override
+ protected void cleanup() {
+ _metaclient.recursiveDelete(_parentPath);
+ }
+
+ private boolean shouldIntroduceError() {
+ return _random.nextFloat() < _puppySpec.getErrorRate();
+ }
+}
diff --git a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestMultiThreadStressTest/SetPuppy.java b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestMultiThreadStressTest/SetPuppy.java
new file mode 100644
index 000000000..c0de4ece7
--- /dev/null
+++ b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestMultiThreadStressTest/SetPuppy.java
@@ -0,0 +1,71 @@
+package org.apache.helix.metaclient.impl.zk.TestMultiThreadStressTest;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.metaclient.api.MetaClientInterface;
+import org.apache.helix.metaclient.exception.MetaClientNoNodeException;
+import org.apache.helix.metaclient.puppy.AbstractPuppy;
+import org.apache.helix.metaclient.puppy.PuppySpec;
+
+import java.util.Random;
+
+public class SetPuppy extends AbstractPuppy {
+
+ private final Random _random;
+ private final String _parentPath = "/test";
+
+ public SetPuppy(MetaClientInterface<String> metaclient, PuppySpec puppySpec) {
+ super(metaclient, puppySpec);
+ _random = new Random();
+ }
+
+ @Override
+ protected void bark() {
+ int randomNumber = _random.nextInt(_puppySpec.getNumberDiffPaths());
+ if (shouldIntroduceError()) {
+ try {
+ _metaclient.set("invalid", "test", -1);
+ } catch (IllegalArgumentException e) {
+ System.out.println(Thread.currentThread().getName() + " intentionally called set on an invalid path" + " at time: " + System.currentTimeMillis());
+ }
+ } else {
+ try {
+ System.out.println(Thread.currentThread().getName() + " is attempting to set node: " + randomNumber + " at time: " + System.currentTimeMillis());
+ _metaclient.set(_parentPath + "/" + randomNumber, "test", -1);
+ _eventChangeCounterMap.put(String.valueOf(randomNumber), _eventChangeCounterMap.getOrDefault(String.valueOf(randomNumber), 0) + 1);
+ System.out.println(
+ Thread.currentThread().getName() + " successfully set node " + randomNumber + " at time: "
+ + System.currentTimeMillis());
+ } catch (MetaClientNoNodeException e) {
+ System.out.println(Thread.currentThread().getName() + " failed to set node " + randomNumber + " at time: " + System.currentTimeMillis() + ", it does not exist");
+ }
+ }
+ }
+
+ @Override
+ protected void cleanup() {
+ _metaclient.recursiveDelete(_parentPath);
+ }
+
+ private boolean shouldIntroduceError() {
+ float randomValue = _random.nextFloat();
+ return randomValue < _puppySpec.getErrorRate();
+ }
+}
diff --git a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestMultiThreadStressTest/TestMultiThreadStressZKClient.java b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestMultiThreadStressTest/TestMultiThreadStressZKClient.java
new file mode 100644
index 000000000..5ee026bc1
--- /dev/null
+++ b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestMultiThreadStressTest/TestMultiThreadStressZKClient.java
@@ -0,0 +1,352 @@
+package org.apache.helix.metaclient.impl.zk.TestMultiThreadStressTest;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.metaclient.api.ChildChangeListener;
+import org.apache.helix.metaclient.impl.zk.ZkMetaClient;
+import org.apache.helix.metaclient.impl.zk.ZkMetaClientTestBase;
+import org.apache.helix.metaclient.puppy.ExecDelay;
+import org.apache.helix.metaclient.puppy.PuppyManager;
+import org.apache.helix.metaclient.puppy.PuppyMode;
+import org.apache.helix.metaclient.puppy.PuppySpec;
+import org.apache.helix.metaclient.puppy.AbstractPuppy;
+import org.testng.Assert;
+import org.testng.annotations.BeforeTest;
+import org.testng.annotations.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class TestMultiThreadStressZKClient extends ZkMetaClientTestBase {
+
+ private ZkMetaClient<String> _zkMetaClient;
+ private final String zkParentKey = "/test";
+
+ private final long TIMEOUT = 60; // The desired timeout duration of tests in seconds
+
+ @BeforeTest
+ private void setUp() {
+ this._zkMetaClient = createZkMetaClient();
+ this._zkMetaClient.connect();
+ }
+
+ @Test
+ public void testCreatePuppy() {
+ _zkMetaClient.create(zkParentKey, "test");
+
+ PuppySpec puppySpec = new org.apache.helix.metaclient.puppy.PuppySpec(PuppyMode.REPEAT, 0.2f, new ExecDelay(5000, 0.1f), 5);
+ CreatePuppy createPuppy = new CreatePuppy(_zkMetaClient, puppySpec);
+ CreatePuppy createPuppy2 = new CreatePuppy(_zkMetaClient, puppySpec);
+ CreatePuppy createPuppy3 = new CreatePuppy(_zkMetaClient, puppySpec);
+
+ PuppyManager puppyManager = new PuppyManager();
+ puppyManager.addPuppy(createPuppy);
+ puppyManager.addPuppy(createPuppy2);
+ puppyManager.addPuppy(createPuppy3);
+
+ puppyManager.start(TIMEOUT);
+
+ assertNoExceptions(puppyManager, null);
+
+ // cleanup
+ _zkMetaClient.recursiveDelete(zkParentKey);
+ Assert.assertEquals(_zkMetaClient.countDirectChildren(zkParentKey), 0);
+ }
+
+ @Test
+ public void testDeletePuppy() {
+ _zkMetaClient.create(zkParentKey, "test");
+
+ PuppySpec puppySpec = new PuppySpec(PuppyMode.REPEAT, 0.2f, new ExecDelay(5000, 0.1f), 5);
+ CreatePuppy createPuppy = new CreatePuppy(_zkMetaClient, puppySpec);
+ DeletePuppy deletePuppy = new DeletePuppy(_zkMetaClient, puppySpec);
+
+ PuppyManager puppyManager = new PuppyManager();
+ puppyManager.addPuppy(createPuppy);
+ puppyManager.addPuppy(deletePuppy);
+
+ puppyManager.start(TIMEOUT);
+
+ assertNoExceptions(puppyManager, null);
+
+ // cleanup
+ _zkMetaClient.recursiveDelete(zkParentKey);
+ Assert.assertEquals(_zkMetaClient.countDirectChildren(zkParentKey), 0);
+ }
+
+ @Test
+ public void testGetPuppy() {
+ _zkMetaClient.create(zkParentKey, "test");
+
+ PuppySpec puppySpec = new PuppySpec(PuppyMode.REPEAT, 0.2f, new ExecDelay(5000, 0.1f), 5);
+ CreatePuppy createPuppy = new CreatePuppy(_zkMetaClient, puppySpec);
+ GetPuppy getPuppy = new GetPuppy(_zkMetaClient, puppySpec);
+
+ PuppyManager puppyManager = new PuppyManager();
+ puppyManager.addPuppy(createPuppy);
+ puppyManager.addPuppy(getPuppy);
+
+ puppyManager.start(TIMEOUT);
+
+ assertNoExceptions(puppyManager, null);
+
+ // cleanup
+ _zkMetaClient.recursiveDelete(zkParentKey);
+ Assert.assertEquals(_zkMetaClient.countDirectChildren(zkParentKey), 0);
+ }
+
+ @Test
+ public void testSetPuppy() {
+ _zkMetaClient.create(zkParentKey, "test");
+
+ PuppySpec puppySpec = new PuppySpec(PuppyMode.REPEAT, 0.2f, new ExecDelay(5000, 0.1f), 5);
+ CreatePuppy createPuppy = new CreatePuppy(_zkMetaClient, puppySpec);
+ SetPuppy setPuppy = new SetPuppy(_zkMetaClient, puppySpec);
+
+ PuppyManager puppyManager = new PuppyManager();
+ puppyManager.addPuppy(createPuppy);
+ puppyManager.addPuppy(setPuppy);
+
+ puppyManager.start(TIMEOUT);
+
+ assertNoExceptions(puppyManager, null);
+
+ // cleanup
+ _zkMetaClient.recursiveDelete(zkParentKey);
+ Assert.assertEquals(_zkMetaClient.countDirectChildren(zkParentKey), 0);
+ }
+
+ @Test
+ public void testUpdatePuppy() {
+ _zkMetaClient.create(zkParentKey, "test");
+
+ PuppySpec puppySpec = new PuppySpec(PuppyMode.REPEAT, 0.2f, new ExecDelay(5000, 0.1f), 5);
+ CreatePuppy createPuppy = new CreatePuppy(_zkMetaClient, puppySpec);
+ UpdatePuppy updatePuppy = new UpdatePuppy(_zkMetaClient, puppySpec);
+
+ PuppyManager puppyManager = new PuppyManager();
+ puppyManager.addPuppy(createPuppy);
+ puppyManager.addPuppy(updatePuppy);
+
+ puppyManager.start(TIMEOUT);
+
+ assertNoExceptions(puppyManager, null);
+
+ // cleanup
+ _zkMetaClient.recursiveDelete(zkParentKey);
+ Assert.assertEquals(_zkMetaClient.countDirectChildren(zkParentKey), 0);
+ }
+
+ @Test
+ public void testCrudPuppies() {
+ _zkMetaClient.create(zkParentKey, "test");
+
+ PuppySpec puppySpec = new PuppySpec(PuppyMode.REPEAT, 0.2f, new ExecDelay(5000, 0.1f), 5);
+ CreatePuppy createPuppy = new CreatePuppy(_zkMetaClient, puppySpec);
+ GetPuppy getPuppy = new GetPuppy(_zkMetaClient, puppySpec);
+ DeletePuppy deletePuppy = new DeletePuppy(_zkMetaClient, puppySpec);
+ SetPuppy setPuppy = new SetPuppy(_zkMetaClient, puppySpec);
+ UpdatePuppy updatePuppy = new UpdatePuppy(_zkMetaClient, puppySpec);
+
+ PuppyManager puppyManager = new PuppyManager();
+ puppyManager.addPuppy(createPuppy);
+ puppyManager.addPuppy(getPuppy);
+ puppyManager.addPuppy(deletePuppy);
+ puppyManager.addPuppy(setPuppy);
+ puppyManager.addPuppy(updatePuppy);
+
+ puppyManager.start(TIMEOUT);
+
+ assertNoExceptions(puppyManager, null);
+
+ // cleanup
+ _zkMetaClient.recursiveDelete(zkParentKey);
+ Assert.assertEquals(_zkMetaClient.countDirectChildren(zkParentKey), 0);
+ }
+
+
+ @Test
+ public void testBasicParentListenerPuppy() {
+ _zkMetaClient.create(zkParentKey, "test");
+ AtomicInteger globalChildChangeCounter = new AtomicInteger();
+ ChildChangeListener childChangeListener = (changedPath, changeType) -> {
+ globalChildChangeCounter.addAndGet(1);
+ System.out.println("-------------- Child change detected: " + changeType + " at path: " + changedPath + " number of changes: " + globalChildChangeCounter.get());
+ };
+
+ _zkMetaClient.subscribeChildChanges(zkParentKey, childChangeListener, false);
+
+ PuppySpec puppySpec = new PuppySpec(PuppyMode.REPEAT, 0.2f, new ExecDelay(5000, 0.1f), 5);
+ CreatePuppy createPuppy = new CreatePuppy(_zkMetaClient, puppySpec);
+
+ PuppyManager puppyManager = new PuppyManager();
+ puppyManager.addPuppy(createPuppy);
+
+ puppyManager.start(TIMEOUT);
+
+ assertNoExceptions(puppyManager, globalChildChangeCounter);
+
+ // cleanup
+ _zkMetaClient.unsubscribeChildChanges(zkParentKey, childChangeListener);
+ _zkMetaClient.recursiveDelete(zkParentKey);
+ Assert.assertEquals(_zkMetaClient.countDirectChildren(zkParentKey), 0);
+ }
+
+ @Test
+ public void testComplexParentListenerPuppy() {
+ _zkMetaClient.create(zkParentKey, "test");
+ // Global counter for all child changes
+ AtomicInteger globalChildChangeCounter = new AtomicInteger();
+ ChildChangeListener childChangeListener = (changedPath, changeType) -> {
+ globalChildChangeCounter.addAndGet(1);
+ System.out.println("-------------- Child change detected: " + changeType + " at path: " + changedPath + " number of changes: " + globalChildChangeCounter.get());
+ };
+
+
+ _zkMetaClient.subscribeChildChanges(zkParentKey, childChangeListener, false);
+
+ PuppySpec puppySpec = new PuppySpec(PuppyMode.REPEAT, 0.2f, new ExecDelay(5000, 0.1f), 5);
+ CreatePuppy createPuppy = new CreatePuppy(_zkMetaClient, puppySpec);
+ GetPuppy getPuppy = new GetPuppy(_zkMetaClient, puppySpec);
+ DeletePuppy deletePuppy = new DeletePuppy(_zkMetaClient, puppySpec);
+ SetPuppy setPuppy = new SetPuppy(_zkMetaClient, puppySpec);
+ UpdatePuppy updatePuppy = new UpdatePuppy(_zkMetaClient, puppySpec);
+
+ PuppyManager puppyManager = new PuppyManager();
+ puppyManager.addPuppy(createPuppy);
+ puppyManager.addPuppy(getPuppy);
+ puppyManager.addPuppy(deletePuppy);
+ puppyManager.addPuppy(setPuppy);
+ puppyManager.addPuppy(updatePuppy);
+
+ puppyManager.start(TIMEOUT);
+
+ assertNoExceptions(puppyManager, globalChildChangeCounter);
+
+ // cleanup
+ _zkMetaClient.recursiveDelete(zkParentKey);
+ Assert.assertEquals(_zkMetaClient.countDirectChildren(zkParentKey), 0);
+ _zkMetaClient.unsubscribeChildChanges(zkParentKey, childChangeListener);
+ _zkMetaClient.delete(zkParentKey);
+ }
+
+
+ @Test
+ public void testChildListenerPuppy() {
+ _zkMetaClient.create(zkParentKey, "test");
+ // Setting num diff paths to 3 until we find a better way of scaling listeners.
+ PuppySpec puppySpec = new PuppySpec(PuppyMode.REPEAT, 0.2f, new ExecDelay(5000, 0.1f), 3);
+ CreatePuppy createPuppy = new CreatePuppy(_zkMetaClient, puppySpec);
+ GetPuppy getPuppy = new GetPuppy(_zkMetaClient, puppySpec);
+ DeletePuppy deletePuppy = new DeletePuppy(_zkMetaClient, puppySpec);
+ SetPuppy setPuppy = new SetPuppy(_zkMetaClient, puppySpec);
+ UpdatePuppy updatePuppy = new UpdatePuppy(_zkMetaClient, puppySpec);
+
+ PuppyManager puppyManager = new PuppyManager();
+ puppyManager.addPuppy(createPuppy);
+ puppyManager.addPuppy(getPuppy);
+ puppyManager.addPuppy(deletePuppy);
+ puppyManager.addPuppy(setPuppy);
+ puppyManager.addPuppy(updatePuppy);
+
+ // Create a child listener for each child defined in number diff paths in puppyspec.
+ // TODO: Make this a parameter for a loop.
+ AtomicInteger childChangeCounter0 = new AtomicInteger();
+ ChildChangeListener childChangeListener0 = (changedPath, changeType) -> {
+ childChangeCounter0.addAndGet(1);
+ System.out.println("-------------- Child change detected: " + changeType + " at path: " + changedPath + " number of changes: " + childChangeCounter0.get());
+ };
+ _zkMetaClient.subscribeChildChanges("/test/0", childChangeListener0, false);
+
+ AtomicInteger childChangeCounter1 = new AtomicInteger();
+ ChildChangeListener childChangeListener1 = (changedPath, changeType) -> {
+ childChangeCounter1.addAndGet(1);
+ System.out.println("-------------- Child change detected: " + changeType + " at path: " + changedPath + " number of changes: " + childChangeCounter1.get());
+ };
+ _zkMetaClient.subscribeChildChanges("/test/1", childChangeListener1, false);
+
+ AtomicInteger childChangeCounter2 = new AtomicInteger();
+ ChildChangeListener childChangeListener2 = (changedPath, changeType) -> {
+ childChangeCounter2.addAndGet(1);
+ System.out.println("-------------- Child change detected: " + changeType + " at path: " + changedPath + " number of changes: " + childChangeCounter2.get());
+ };
+ _zkMetaClient.subscribeChildChanges("/test/2", childChangeListener2, false);
+
+ puppyManager.start(TIMEOUT);
+
+ assertNoExceptions(puppyManager, null);
+
+ // Add all event changes from all puppies and compare with child change listener
+ // Inner merged by path
+ Map<String, Integer> mergedEventChangeCounterMap = new HashMap<>();
+ for (AbstractPuppy puppy : puppyManager.getPuppies()) {
+ puppy._eventChangeCounterMap.forEach((key, value) -> {
+ if (mergedEventChangeCounterMap.containsKey(key)) {
+ mergedEventChangeCounterMap.put(key, mergedEventChangeCounterMap.get(key) + value);
+ } else {
+ mergedEventChangeCounterMap.put(key, value);
+ }
+ });
+ }
+
+ System.out.println("Merged event change counter map: " + mergedEventChangeCounterMap);
+ System.out.println("Child change counter 0: " + childChangeCounter0);
+ System.out.println("Child change counter 1: " + childChangeCounter1);
+ System.out.println("Child change counter 2: " + childChangeCounter2);
+ Assert.assertEquals(childChangeCounter0.get(), mergedEventChangeCounterMap.getOrDefault("0", 0).intValue());
+ Assert.assertEquals(childChangeCounter1.get(), mergedEventChangeCounterMap.getOrDefault("1", 0).intValue());
+ Assert.assertEquals(childChangeCounter2.get(), mergedEventChangeCounterMap.getOrDefault("2", 0).intValue());
+
+ // cleanup
+ _zkMetaClient.unsubscribeChildChanges("/test/0", childChangeListener0);
+ _zkMetaClient.unsubscribeChildChanges("/test/1", childChangeListener1);
+ _zkMetaClient.unsubscribeChildChanges("/test/2", childChangeListener2);
+ _zkMetaClient.recursiveDelete(zkParentKey);
+ Assert.assertEquals(_zkMetaClient.countDirectChildren(zkParentKey), 0);
+ }
+
+ private void assertNoExceptions(PuppyManager puppyManager, AtomicInteger globalChangeCounter) {
+ int totalUnhandledErrors = 0;
+ int totalEventChanges = 0;
+
+ // Add all change counters and compare with event change listener
+ for (AbstractPuppy puppy : puppyManager.getPuppies()) {
+ AtomicInteger totalHandledErrors = new AtomicInteger();
+ puppy._eventChangeCounterMap.forEach((key, value) -> {
+ totalHandledErrors.addAndGet(value);
+ });
+
+ System.out.println("Change counter: " + totalHandledErrors + " for " + puppy.getClass());
+ System.out.println("Error counter: " + puppy._unhandledErrorCounter + " for " + puppy.getClass());
+ totalUnhandledErrors += puppy._unhandledErrorCounter;
+ totalEventChanges += totalHandledErrors.get();
+ }
+
+ // Assert no unhandled (unexpected) exceptions and that the child change listener placed on
+ // test parent node (/test) caught all successful changes that were recorded by each puppy
+ Assert.assertEquals(totalUnhandledErrors, 0);
+
+ // Assert that the global change counter matches the total number of events recorded by each puppy
+ if (globalChangeCounter != null) {
+ Assert.assertEquals(totalEventChanges, globalChangeCounter.get());
+ }
+ }
+}
diff --git a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestMultiThreadStressTest/UpdatePuppy.java b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestMultiThreadStressTest/UpdatePuppy.java
new file mode 100644
index 000000000..721d507eb
--- /dev/null
+++ b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestMultiThreadStressTest/UpdatePuppy.java
@@ -0,0 +1,74 @@
+package org.apache.helix.metaclient.impl.zk.TestMultiThreadStressTest;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.metaclient.api.MetaClientInterface;
+import org.apache.helix.metaclient.exception.MetaClientNoNodeException;
+import org.apache.helix.metaclient.puppy.AbstractPuppy;
+import org.apache.helix.metaclient.puppy.PuppySpec;
+
+import java.util.Random;
+
+public class UpdatePuppy extends AbstractPuppy {
+
+ private final Random _random;
+ private final String _parentPath = "/test";
+
+ public UpdatePuppy(MetaClientInterface<String> metaclient, PuppySpec puppySpec) {
+ super(metaclient, puppySpec);
+ _random = new Random();
+ }
+
+ @Override
+ protected void bark() throws Exception {
+ int randomNumber = _random.nextInt(_puppySpec.getNumberDiffPaths());
+ if (shouldIntroduceError()) {
+ try {
+ _metaclient.update("invalid", (data) -> "foo");
+ } catch (IllegalArgumentException e) {
+ System.out.println(Thread.currentThread().getName() + " intentionally tried to update an invalid path" + " at time: " + System.currentTimeMillis());
+ }
+ } else {
+ try {
+ System.out.println(Thread.currentThread().getName() + " is attempting to update node: " + randomNumber + " at time: " + System.currentTimeMillis());
+ _metaclient.update(_parentPath + "/" + randomNumber, (data) -> "foo");
+ _eventChangeCounterMap.put(String.valueOf(randomNumber), _eventChangeCounterMap.getOrDefault(String.valueOf(randomNumber), 0) + 1);
+ System.out.println(Thread.currentThread().getName() + " successfully updated node " + randomNumber + " at time: "
+ + System.currentTimeMillis());
+ } catch (MetaClientNoNodeException e) {
+ System.out.println(Thread.currentThread().getName() + " failed to update node " + randomNumber + " at time: " + System.currentTimeMillis() + ", it does not exist");
+ } catch (IllegalArgumentException e) {
+ if (!e.getMessage().equals("Can not subscribe one time watcher when ZkClient is using PersistWatcher")) {
+ throw e;
+ }
+ }
+ }
+ }
+
+ @Override
+ protected void cleanup() {
+ _metaclient.recursiveDelete(_parentPath);
+ }
+
+ private boolean shouldIntroduceError() {
+ float randomValue = _random.nextFloat();
+ return randomValue < _puppySpec.getErrorRate();
+ }
+}
diff --git a/meta-client/src/test/java/org/apache/helix/metaclient/puppy/AbstractPuppy.java b/meta-client/src/test/java/org/apache/helix/metaclient/puppy/AbstractPuppy.java
index 85137fc17..bfbbb915d 100644
--- a/meta-client/src/test/java/org/apache/helix/metaclient/puppy/AbstractPuppy.java
+++ b/meta-client/src/test/java/org/apache/helix/metaclient/puppy/AbstractPuppy.java
@@ -29,8 +29,9 @@ public abstract class AbstractPuppy implements Runnable {
protected MetaClientInterface<String> _metaclient;
protected PuppySpec _puppySpec;
- public HashMap<String, Integer> _eventChangeCounterMap;
- protected int _unhandledErrorCounter;
+ public final HashMap<String, Integer> _eventChangeCounterMap;
+ public int _unhandledErrorCounter;
+
public AbstractPuppy(MetaClientInterface<String> metaclient, PuppySpec puppySpec) {
_metaclient = metaclient;
@@ -69,6 +70,8 @@ public abstract class AbstractPuppy implements Runnable {
try {
Thread.sleep(getPuppySpec().getExecDelay().getNextDelay());
} catch (InterruptedException e) {
+ cleanup();
+ break;
// Handle interruption if necessary
}
}