You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by xy...@apache.org on 2023/07/20 17:05:42 UTC

[helix] branch metaclient updated: Multithreading stress test lattice - CRUD puppies and Listener Tests (#2548)

This is an automated email from the ASF dual-hosted git repository.

xyuanlu pushed a commit to branch metaclient
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/metaclient by this push:
     new 9f36c1062 Multithreading stress test lattice - CRUD puppies and Listener Tests (#2548)
9f36c1062 is described below

commit 9f36c1062743664175118b3b4698aa73600b09d4
Author: Marcos Rico Peng <55...@users.noreply.github.com>
AuthorDate: Thu Jul 20 13:05:37 2023 -0400

    Multithreading stress test lattice - CRUD puppies and Listener Tests (#2548)
    
    
    ---------
    
    Co-authored-by: mapeng <ma...@linkedin.com>
---
 .../helix/metaclient/impl/zk/ZkMetaClient.java     |   6 +-
 .../zk/TestMultiThreadStressTest/CreatePuppy.java  |  78 +++++
 .../zk/TestMultiThreadStressTest/DeletePuppy.java  |  67 ++++
 .../zk/TestMultiThreadStressTest/GetPuppy.java     |  68 ++++
 .../zk/TestMultiThreadStressTest/SetPuppy.java     |  71 +++++
 .../TestMultiThreadStressZKClient.java             | 352 +++++++++++++++++++++
 .../zk/TestMultiThreadStressTest/UpdatePuppy.java  |  74 +++++
 .../helix/metaclient/puppy/AbstractPuppy.java      |   7 +-
 8 files changed, 717 insertions(+), 6 deletions(-)

diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java
index 7f68ec9ca..691f31cde 100644
--- a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java
+++ b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java
@@ -101,9 +101,7 @@ public class ZkMetaClient<T> implements MetaClientInterface<T>, AutoCloseable {
     try {
       create(key, data, EntryMode.PERSISTENT);
     } catch (ZkException e) {
-      throw ZkMetaClientUtil.translateZkExceptionToMetaclientException(e);
-    } catch (Exception e) {
-      throw new MetaClientException(e);
+      throw translateZkExceptionToMetaclientException(e);
     }
   }
 
@@ -113,7 +111,7 @@ public class ZkMetaClient<T> implements MetaClientInterface<T>, AutoCloseable {
     try {
       _zkClient.create(key, data, ZkMetaClientUtil.convertMetaClientMode(mode));
     } catch (ZkException e) {
-      throw ZkMetaClientUtil.translateZkExceptionToMetaclientException(e);
+      throw translateZkExceptionToMetaclientException(e);
     } catch (KeeperException e) {
       throw new MetaClientException(e);
     }
diff --git a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestMultiThreadStressTest/CreatePuppy.java b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestMultiThreadStressTest/CreatePuppy.java
new file mode 100644
index 000000000..3e28df06b
--- /dev/null
+++ b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestMultiThreadStressTest/CreatePuppy.java
@@ -0,0 +1,78 @@
+package org.apache.helix.metaclient.impl.zk.TestMultiThreadStressTest;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.metaclient.api.MetaClientInterface;
+import org.apache.helix.metaclient.exception.MetaClientNodeExistsException;
+import org.apache.helix.metaclient.puppy.AbstractPuppy;
+import org.apache.helix.metaclient.puppy.PuppySpec;
+
+import java.util.Random;
+
+public class CreatePuppy extends AbstractPuppy {
+
+  private final Random _random;
+  private final String _parentPath = "/test";
+
+  public CreatePuppy(MetaClientInterface<String> metaclient, PuppySpec puppySpec) {
+    super(metaclient, puppySpec);
+    _random = new Random();
+  }
+
+  @Override
+  protected void bark() {
+    // Implement the chaos logic for creating nodes
+    int randomNumber = _random.nextInt(_puppySpec.getNumberDiffPaths());
+    if (shouldIntroduceError()) {
+      try {
+        // Simulate an error by creating an invalid path
+        _metaclient.create("invalid", "test");
+      } catch (IllegalArgumentException e) { // Catch invalid exception
+        System.out.println(Thread.currentThread().getName() + " tried to create an invalid path" + " at time: " + System.currentTimeMillis());
+        // Expected exception
+      }
+    } else {
+      // Normal behavior - create a new node
+      try {
+        System.out.println(Thread.currentThread().getName() + " is attempting to create node: " + randomNumber + " at time: " + System.currentTimeMillis());
+        _metaclient.create(_parentPath + "/" + randomNumber,"test");
+        System.out.println(Thread.currentThread().getName() + " successfully created node " + randomNumber + " at time: " + System.currentTimeMillis());
+        _eventChangeCounterMap.put(String.valueOf(randomNumber), _eventChangeCounterMap.getOrDefault(String.valueOf(randomNumber), 0) + 1);
+      } catch (MetaClientNodeExistsException e) {
+        // Expected exception
+        System.out.println(Thread.currentThread().getName() + " failed to create node " + randomNumber + " at time: " + System.currentTimeMillis() + ", it already exists");
+      }
+    }
+  }
+
+  @Override
+  protected void cleanup() {
+    // Implement the recovery logic by deleting the created documents
+    _metaclient.recursiveDelete(_parentPath);
+  }
+
+  private boolean shouldIntroduceError() {
+    float randomValue = _random.nextFloat();
+    return randomValue < _puppySpec.getErrorRate();
+  }
+}
+
+
+
diff --git a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestMultiThreadStressTest/DeletePuppy.java b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestMultiThreadStressTest/DeletePuppy.java
new file mode 100644
index 000000000..e0e1b7b5c
--- /dev/null
+++ b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestMultiThreadStressTest/DeletePuppy.java
@@ -0,0 +1,67 @@
+package org.apache.helix.metaclient.impl.zk.TestMultiThreadStressTest;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.metaclient.api.MetaClientInterface;
+import org.apache.helix.metaclient.puppy.AbstractPuppy;
+import org.apache.helix.metaclient.puppy.PuppySpec;
+
+import java.util.Random;
+
+public class DeletePuppy extends AbstractPuppy {
+
+  private final Random _random;
+  private final String _parentPath = "/test";
+
+  public DeletePuppy(MetaClientInterface<String> metaclient, PuppySpec puppySpec) {
+    super(metaclient, puppySpec);
+    _random = new Random();
+  }
+
+  @Override
+  protected void bark() {
+    int randomNumber = _random.nextInt(_puppySpec.getNumberDiffPaths());
+    if (shouldIntroduceError()) {
+      try {
+        _metaclient.delete("invalid");
+        _unhandledErrorCounter++;
+      } catch (IllegalArgumentException e) {
+        System.out.println(Thread.currentThread().getName() + " intentionally deleted an invalid path" + " at time: " + System.currentTimeMillis() );
+      }
+    } else {
+      System.out.println(Thread.currentThread().getName() + " is attempting to delete node: " + randomNumber + " at time: " + System.currentTimeMillis());
+      if (_metaclient.delete(_parentPath + "/" + randomNumber)) {
+        System.out.println(Thread.currentThread().getName() + " successfully deleted node " + randomNumber + " at time: " + System.currentTimeMillis());
+        _eventChangeCounterMap.put(String.valueOf(randomNumber), _eventChangeCounterMap.getOrDefault(String.valueOf(randomNumber), 0) + 1);
+      } else {
+        System.out.println(Thread.currentThread().getName() + " failed to delete node " + randomNumber + " at time: " + System.currentTimeMillis() + ", it does not exist");
+      }
+    }
+  }
+
+  @Override
+  protected void cleanup() {
+    _metaclient.recursiveDelete(_parentPath);
+  }
+
+  private boolean shouldIntroduceError() {
+    return _random.nextFloat() < _puppySpec.getErrorRate();
+  }
+}
\ No newline at end of file
diff --git a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestMultiThreadStressTest/GetPuppy.java b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestMultiThreadStressTest/GetPuppy.java
new file mode 100644
index 000000000..fe24b2bd3
--- /dev/null
+++ b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestMultiThreadStressTest/GetPuppy.java
@@ -0,0 +1,68 @@
+package org.apache.helix.metaclient.impl.zk.TestMultiThreadStressTest;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.metaclient.api.MetaClientInterface;
+import org.apache.helix.metaclient.puppy.AbstractPuppy;
+import org.apache.helix.metaclient.puppy.PuppySpec;
+
+import java.util.Objects;
+import java.util.Random;
+
+public class GetPuppy extends AbstractPuppy {
+
+  private final Random _random;
+  private final String _parentPath = "/test";
+
+  public GetPuppy(MetaClientInterface<String> metaclient, PuppySpec puppySpec) {
+    super(metaclient, puppySpec);
+    _random = new Random();
+  }
+
+  @Override
+  protected void bark() {
+    int randomNumber = _random.nextInt(_puppySpec.getNumberDiffPaths());
+    if (shouldIntroduceError()) {
+      try {
+        _metaclient.get("invalid");
+        _unhandledErrorCounter++;
+      } catch (IllegalArgumentException e) {
+        System.out.println(Thread.currentThread().getName() + " intentionally tried to read an invalid path" + " at time: " + System.currentTimeMillis());
+      }
+    } else {
+      System.out.println(Thread.currentThread().getName() + " is attempting to read node: " + randomNumber + " at time: " + System.currentTimeMillis());
+      String nodeValue = _metaclient.get(_parentPath + "/" + randomNumber);
+      if (Objects.equals(nodeValue, null)) {
+        System.out.println(Thread.currentThread().getName() + " failed to read node " + randomNumber + " at time: " + System.currentTimeMillis() + ", it does not exist");
+      } else {
+        System.out.println(Thread.currentThread().getName() + " successfully read node " + randomNumber + " at time: " + System.currentTimeMillis());
+      }
+    }
+  }
+
+  @Override
+  protected void cleanup() {
+    _metaclient.recursiveDelete(_parentPath);
+  }
+
+  private boolean shouldIntroduceError() {
+    return _random.nextFloat() < _puppySpec.getErrorRate();
+  }
+}
diff --git a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestMultiThreadStressTest/SetPuppy.java b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestMultiThreadStressTest/SetPuppy.java
new file mode 100644
index 000000000..c0de4ece7
--- /dev/null
+++ b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestMultiThreadStressTest/SetPuppy.java
@@ -0,0 +1,71 @@
+package org.apache.helix.metaclient.impl.zk.TestMultiThreadStressTest;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.metaclient.api.MetaClientInterface;
+import org.apache.helix.metaclient.exception.MetaClientNoNodeException;
+import org.apache.helix.metaclient.puppy.AbstractPuppy;
+import org.apache.helix.metaclient.puppy.PuppySpec;
+
+import java.util.Random;
+
+public class SetPuppy extends AbstractPuppy {
+
+  private final Random _random;
+  private final String _parentPath = "/test";
+
+  public SetPuppy(MetaClientInterface<String> metaclient, PuppySpec puppySpec) {
+    super(metaclient, puppySpec);
+    _random = new Random();
+  }
+
+  @Override
+  protected void bark() {
+    int randomNumber = _random.nextInt(_puppySpec.getNumberDiffPaths());
+    if (shouldIntroduceError()) {
+      try {
+        _metaclient.set("invalid", "test", -1);
+      } catch (IllegalArgumentException e) {
+        System.out.println(Thread.currentThread().getName() + " intentionally called set on an invalid path" + " at time: " + System.currentTimeMillis());
+      }
+    } else {
+      try {
+        System.out.println(Thread.currentThread().getName() + " is attempting to set node: " + randomNumber + " at time: " + System.currentTimeMillis());
+        _metaclient.set(_parentPath + "/" + randomNumber, "test", -1);
+        _eventChangeCounterMap.put(String.valueOf(randomNumber), _eventChangeCounterMap.getOrDefault(String.valueOf(randomNumber), 0) + 1);
+        System.out.println(
+            Thread.currentThread().getName() + " successfully set node " + randomNumber + " at time: "
+                + System.currentTimeMillis());
+      } catch (MetaClientNoNodeException e) {
+        System.out.println(Thread.currentThread().getName() + " failed to set node " + randomNumber + " at time: " + System.currentTimeMillis() + ", it does not exist");
+      }
+    }
+  }
+
+  @Override
+  protected void cleanup() {
+    _metaclient.recursiveDelete(_parentPath);
+  }
+
+  private boolean shouldIntroduceError() {
+    float randomValue = _random.nextFloat();
+    return randomValue < _puppySpec.getErrorRate();
+  }
+}
diff --git a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestMultiThreadStressTest/TestMultiThreadStressZKClient.java b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestMultiThreadStressTest/TestMultiThreadStressZKClient.java
new file mode 100644
index 000000000..5ee026bc1
--- /dev/null
+++ b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestMultiThreadStressTest/TestMultiThreadStressZKClient.java
@@ -0,0 +1,352 @@
+package org.apache.helix.metaclient.impl.zk.TestMultiThreadStressTest;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.metaclient.api.ChildChangeListener;
+import org.apache.helix.metaclient.impl.zk.ZkMetaClient;
+import org.apache.helix.metaclient.impl.zk.ZkMetaClientTestBase;
+import org.apache.helix.metaclient.puppy.ExecDelay;
+import org.apache.helix.metaclient.puppy.PuppyManager;
+import org.apache.helix.metaclient.puppy.PuppyMode;
+import org.apache.helix.metaclient.puppy.PuppySpec;
+import org.apache.helix.metaclient.puppy.AbstractPuppy;
+import org.testng.Assert;
+import org.testng.annotations.BeforeTest;
+import org.testng.annotations.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class TestMultiThreadStressZKClient extends ZkMetaClientTestBase {
+
+  private ZkMetaClient<String> _zkMetaClient;
+  private final String zkParentKey = "/test";
+
+  private final long TIMEOUT = 60; // The desired timeout duration of tests in seconds
+
+  @BeforeTest
+  private void setUp() {
+    this._zkMetaClient = createZkMetaClient();
+    this._zkMetaClient.connect();
+  }
+
+  @Test
+  public void testCreatePuppy() {
+    _zkMetaClient.create(zkParentKey, "test");
+
+    PuppySpec puppySpec = new org.apache.helix.metaclient.puppy.PuppySpec(PuppyMode.REPEAT, 0.2f, new ExecDelay(5000, 0.1f), 5);
+    CreatePuppy createPuppy = new CreatePuppy(_zkMetaClient, puppySpec);
+    CreatePuppy createPuppy2 = new CreatePuppy(_zkMetaClient, puppySpec);
+    CreatePuppy createPuppy3 = new CreatePuppy(_zkMetaClient, puppySpec);
+
+    PuppyManager puppyManager = new PuppyManager();
+    puppyManager.addPuppy(createPuppy);
+    puppyManager.addPuppy(createPuppy2);
+    puppyManager.addPuppy(createPuppy3);
+
+    puppyManager.start(TIMEOUT);
+
+    assertNoExceptions(puppyManager, null);
+
+    // cleanup
+    _zkMetaClient.recursiveDelete(zkParentKey);
+    Assert.assertEquals(_zkMetaClient.countDirectChildren(zkParentKey), 0);
+  }
+
+  @Test
+  public void testDeletePuppy() {
+    _zkMetaClient.create(zkParentKey, "test");
+
+    PuppySpec puppySpec = new PuppySpec(PuppyMode.REPEAT, 0.2f, new ExecDelay(5000, 0.1f), 5);
+    CreatePuppy createPuppy = new CreatePuppy(_zkMetaClient, puppySpec);
+    DeletePuppy deletePuppy = new DeletePuppy(_zkMetaClient, puppySpec);
+
+    PuppyManager puppyManager = new PuppyManager();
+    puppyManager.addPuppy(createPuppy);
+    puppyManager.addPuppy(deletePuppy);
+
+    puppyManager.start(TIMEOUT);
+
+    assertNoExceptions(puppyManager, null);
+
+    // cleanup
+    _zkMetaClient.recursiveDelete(zkParentKey);
+    Assert.assertEquals(_zkMetaClient.countDirectChildren(zkParentKey), 0);
+  }
+
+  @Test
+  public void testGetPuppy() {
+    _zkMetaClient.create(zkParentKey, "test");
+
+    PuppySpec puppySpec = new PuppySpec(PuppyMode.REPEAT, 0.2f, new ExecDelay(5000, 0.1f), 5);
+    CreatePuppy createPuppy = new CreatePuppy(_zkMetaClient, puppySpec);
+    GetPuppy getPuppy = new GetPuppy(_zkMetaClient, puppySpec);
+
+    PuppyManager puppyManager = new PuppyManager();
+    puppyManager.addPuppy(createPuppy);
+    puppyManager.addPuppy(getPuppy);
+
+    puppyManager.start(TIMEOUT);
+
+    assertNoExceptions(puppyManager, null);
+
+    // cleanup
+    _zkMetaClient.recursiveDelete(zkParentKey);
+    Assert.assertEquals(_zkMetaClient.countDirectChildren(zkParentKey), 0);
+  }
+
+  @Test
+  public void testSetPuppy() {
+    _zkMetaClient.create(zkParentKey, "test");
+
+    PuppySpec puppySpec = new PuppySpec(PuppyMode.REPEAT, 0.2f, new ExecDelay(5000, 0.1f), 5);
+    CreatePuppy createPuppy = new CreatePuppy(_zkMetaClient, puppySpec);
+    SetPuppy setPuppy = new SetPuppy(_zkMetaClient, puppySpec);
+
+    PuppyManager puppyManager = new PuppyManager();
+    puppyManager.addPuppy(createPuppy);
+    puppyManager.addPuppy(setPuppy);
+
+    puppyManager.start(TIMEOUT);
+
+    assertNoExceptions(puppyManager, null);
+
+    // cleanup
+    _zkMetaClient.recursiveDelete(zkParentKey);
+    Assert.assertEquals(_zkMetaClient.countDirectChildren(zkParentKey), 0);
+  }
+
+  @Test
+  public void testUpdatePuppy() {
+    _zkMetaClient.create(zkParentKey, "test");
+
+    PuppySpec puppySpec = new PuppySpec(PuppyMode.REPEAT, 0.2f, new ExecDelay(5000, 0.1f), 5);
+    CreatePuppy createPuppy = new CreatePuppy(_zkMetaClient, puppySpec);
+    UpdatePuppy updatePuppy = new UpdatePuppy(_zkMetaClient, puppySpec);
+
+    PuppyManager puppyManager = new PuppyManager();
+    puppyManager.addPuppy(createPuppy);
+    puppyManager.addPuppy(updatePuppy);
+
+    puppyManager.start(TIMEOUT);
+
+    assertNoExceptions(puppyManager, null);
+
+    // cleanup
+    _zkMetaClient.recursiveDelete(zkParentKey);
+    Assert.assertEquals(_zkMetaClient.countDirectChildren(zkParentKey), 0);
+  }
+
+  @Test
+  public void testCrudPuppies() {
+    _zkMetaClient.create(zkParentKey, "test");
+
+    PuppySpec puppySpec = new PuppySpec(PuppyMode.REPEAT, 0.2f, new ExecDelay(5000, 0.1f), 5);
+    CreatePuppy createPuppy = new CreatePuppy(_zkMetaClient, puppySpec);
+    GetPuppy getPuppy = new GetPuppy(_zkMetaClient, puppySpec);
+    DeletePuppy deletePuppy = new DeletePuppy(_zkMetaClient, puppySpec);
+    SetPuppy setPuppy = new SetPuppy(_zkMetaClient, puppySpec);
+    UpdatePuppy updatePuppy = new UpdatePuppy(_zkMetaClient, puppySpec);
+
+    PuppyManager puppyManager = new PuppyManager();
+    puppyManager.addPuppy(createPuppy);
+    puppyManager.addPuppy(getPuppy);
+    puppyManager.addPuppy(deletePuppy);
+    puppyManager.addPuppy(setPuppy);
+    puppyManager.addPuppy(updatePuppy);
+
+    puppyManager.start(TIMEOUT);
+
+    assertNoExceptions(puppyManager, null);
+
+    // cleanup
+    _zkMetaClient.recursiveDelete(zkParentKey);
+    Assert.assertEquals(_zkMetaClient.countDirectChildren(zkParentKey), 0);
+  }
+
+
+  @Test
+  public void testBasicParentListenerPuppy() {
+    _zkMetaClient.create(zkParentKey, "test");
+    AtomicInteger globalChildChangeCounter = new AtomicInteger();
+    ChildChangeListener childChangeListener = (changedPath, changeType) -> {
+      globalChildChangeCounter.addAndGet(1);
+      System.out.println("-------------- Child change detected: " + changeType + " at path: " + changedPath + " number of changes: " + globalChildChangeCounter.get());
+    };
+
+    _zkMetaClient.subscribeChildChanges(zkParentKey, childChangeListener, false);
+
+    PuppySpec puppySpec = new PuppySpec(PuppyMode.REPEAT, 0.2f, new ExecDelay(5000, 0.1f), 5);
+    CreatePuppy createPuppy = new CreatePuppy(_zkMetaClient, puppySpec);
+
+    PuppyManager puppyManager = new PuppyManager();
+    puppyManager.addPuppy(createPuppy);
+
+    puppyManager.start(TIMEOUT);
+
+    assertNoExceptions(puppyManager, globalChildChangeCounter);
+
+    // cleanup
+    _zkMetaClient.unsubscribeChildChanges(zkParentKey, childChangeListener);
+    _zkMetaClient.recursiveDelete(zkParentKey);
+    Assert.assertEquals(_zkMetaClient.countDirectChildren(zkParentKey), 0);
+  }
+
+  @Test
+  public void testComplexParentListenerPuppy() {
+    _zkMetaClient.create(zkParentKey, "test");
+    // Global counter for all child changes
+    AtomicInteger globalChildChangeCounter = new AtomicInteger();
+    ChildChangeListener childChangeListener = (changedPath, changeType) -> {
+      globalChildChangeCounter.addAndGet(1);
+      System.out.println("-------------- Child change detected: " + changeType + " at path: " + changedPath + " number of changes: " + globalChildChangeCounter.get());
+    };
+
+
+    _zkMetaClient.subscribeChildChanges(zkParentKey, childChangeListener, false);
+
+    PuppySpec puppySpec = new PuppySpec(PuppyMode.REPEAT, 0.2f, new ExecDelay(5000, 0.1f), 5);
+    CreatePuppy createPuppy = new CreatePuppy(_zkMetaClient, puppySpec);
+    GetPuppy getPuppy = new GetPuppy(_zkMetaClient, puppySpec);
+    DeletePuppy deletePuppy = new DeletePuppy(_zkMetaClient, puppySpec);
+    SetPuppy setPuppy = new SetPuppy(_zkMetaClient, puppySpec);
+    UpdatePuppy updatePuppy = new UpdatePuppy(_zkMetaClient, puppySpec);
+
+    PuppyManager puppyManager = new PuppyManager();
+    puppyManager.addPuppy(createPuppy);
+    puppyManager.addPuppy(getPuppy);
+    puppyManager.addPuppy(deletePuppy);
+    puppyManager.addPuppy(setPuppy);
+    puppyManager.addPuppy(updatePuppy);
+
+    puppyManager.start(TIMEOUT);
+
+    assertNoExceptions(puppyManager, globalChildChangeCounter);
+
+    // cleanup
+    _zkMetaClient.recursiveDelete(zkParentKey);
+    Assert.assertEquals(_zkMetaClient.countDirectChildren(zkParentKey), 0);
+    _zkMetaClient.unsubscribeChildChanges(zkParentKey, childChangeListener);
+    _zkMetaClient.delete(zkParentKey);
+  }
+
+
+  @Test
+  public void testChildListenerPuppy() {
+    _zkMetaClient.create(zkParentKey, "test");
+    // Setting num diff paths to 3 until we find a better way of scaling listeners.
+    PuppySpec puppySpec = new PuppySpec(PuppyMode.REPEAT, 0.2f, new ExecDelay(5000, 0.1f), 3);
+    CreatePuppy createPuppy = new CreatePuppy(_zkMetaClient, puppySpec);
+    GetPuppy getPuppy = new GetPuppy(_zkMetaClient, puppySpec);
+    DeletePuppy deletePuppy = new DeletePuppy(_zkMetaClient, puppySpec);
+    SetPuppy setPuppy = new SetPuppy(_zkMetaClient, puppySpec);
+    UpdatePuppy updatePuppy = new UpdatePuppy(_zkMetaClient, puppySpec);
+
+    PuppyManager puppyManager = new PuppyManager();
+    puppyManager.addPuppy(createPuppy);
+    puppyManager.addPuppy(getPuppy);
+    puppyManager.addPuppy(deletePuppy);
+    puppyManager.addPuppy(setPuppy);
+    puppyManager.addPuppy(updatePuppy);
+
+    // Create a child listener for each child defined in number diff paths in puppyspec.
+    // TODO: Make this a parameter for a loop.
+    AtomicInteger childChangeCounter0 = new AtomicInteger();
+    ChildChangeListener childChangeListener0 = (changedPath, changeType) -> {
+      childChangeCounter0.addAndGet(1);
+      System.out.println("-------------- Child change detected: " + changeType + " at path: " + changedPath + " number of changes: " + childChangeCounter0.get());
+    };
+    _zkMetaClient.subscribeChildChanges("/test/0", childChangeListener0, false);
+
+    AtomicInteger childChangeCounter1 = new AtomicInteger();
+    ChildChangeListener childChangeListener1 = (changedPath, changeType) -> {
+      childChangeCounter1.addAndGet(1);
+      System.out.println("-------------- Child change detected: " + changeType + " at path: " + changedPath + " number of changes: " + childChangeCounter1.get());
+    };
+    _zkMetaClient.subscribeChildChanges("/test/1", childChangeListener1, false);
+
+    AtomicInteger childChangeCounter2 = new AtomicInteger();
+    ChildChangeListener childChangeListener2 = (changedPath, changeType) -> {
+      childChangeCounter2.addAndGet(1);
+      System.out.println("-------------- Child change detected: " + changeType + " at path: " + changedPath + " number of changes: " + childChangeCounter2.get());
+    };
+    _zkMetaClient.subscribeChildChanges("/test/2", childChangeListener2, false);
+
+    puppyManager.start(TIMEOUT);
+
+    assertNoExceptions(puppyManager, null);
+
+    // Add all event changes from all puppies and compare with child change listener
+    // Inner merged by path
+    Map<String, Integer> mergedEventChangeCounterMap = new HashMap<>();
+    for (AbstractPuppy puppy : puppyManager.getPuppies()) {
+      puppy._eventChangeCounterMap.forEach((key, value) -> {
+        if (mergedEventChangeCounterMap.containsKey(key)) {
+          mergedEventChangeCounterMap.put(key, mergedEventChangeCounterMap.get(key) + value);
+        } else {
+          mergedEventChangeCounterMap.put(key, value);
+        }
+      });
+    }
+
+    System.out.println("Merged event change counter map: " + mergedEventChangeCounterMap);
+    System.out.println("Child change counter 0: " + childChangeCounter0);
+    System.out.println("Child change counter 1: " + childChangeCounter1);
+    System.out.println("Child change counter 2: " + childChangeCounter2);
+    Assert.assertEquals(childChangeCounter0.get(), mergedEventChangeCounterMap.getOrDefault("0", 0).intValue());
+    Assert.assertEquals(childChangeCounter1.get(), mergedEventChangeCounterMap.getOrDefault("1", 0).intValue());
+    Assert.assertEquals(childChangeCounter2.get(), mergedEventChangeCounterMap.getOrDefault("2", 0).intValue());
+
+    // cleanup
+    _zkMetaClient.unsubscribeChildChanges("/test/0", childChangeListener0);
+    _zkMetaClient.unsubscribeChildChanges("/test/1", childChangeListener1);
+    _zkMetaClient.unsubscribeChildChanges("/test/2", childChangeListener2);
+    _zkMetaClient.recursiveDelete(zkParentKey);
+    Assert.assertEquals(_zkMetaClient.countDirectChildren(zkParentKey), 0);
+  }
+
+  private void assertNoExceptions(PuppyManager puppyManager, AtomicInteger globalChangeCounter) {
+    int totalUnhandledErrors = 0;
+    int totalEventChanges = 0;
+
+    // Add all change counters and compare with event change listener
+    for (AbstractPuppy puppy : puppyManager.getPuppies()) {
+      AtomicInteger totalHandledErrors = new AtomicInteger();
+      puppy._eventChangeCounterMap.forEach((key, value) -> {
+        totalHandledErrors.addAndGet(value);
+      });
+
+      System.out.println("Change counter: " + totalHandledErrors + " for " + puppy.getClass());
+      System.out.println("Error counter: " + puppy._unhandledErrorCounter + " for " + puppy.getClass());
+      totalUnhandledErrors += puppy._unhandledErrorCounter;
+      totalEventChanges += totalHandledErrors.get();
+    }
+
+    // Assert no unhandled (unexpected) exceptions and that the child change listener placed on
+    // test parent node (/test) caught all successful changes that were recorded by each puppy
+    Assert.assertEquals(totalUnhandledErrors, 0);
+
+    // Assert that the global change counter matches the total number of events recorded by each puppy
+    if (globalChangeCounter != null) {
+      Assert.assertEquals(totalEventChanges, globalChangeCounter.get());
+    }
+  }
+}
diff --git a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestMultiThreadStressTest/UpdatePuppy.java b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestMultiThreadStressTest/UpdatePuppy.java
new file mode 100644
index 000000000..721d507eb
--- /dev/null
+++ b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestMultiThreadStressTest/UpdatePuppy.java
@@ -0,0 +1,74 @@
+package org.apache.helix.metaclient.impl.zk.TestMultiThreadStressTest;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.metaclient.api.MetaClientInterface;
+import org.apache.helix.metaclient.exception.MetaClientNoNodeException;
+import org.apache.helix.metaclient.puppy.AbstractPuppy;
+import org.apache.helix.metaclient.puppy.PuppySpec;
+
+import java.util.Random;
+
+public class UpdatePuppy extends AbstractPuppy {
+
+  private final Random _random;
+  private final String _parentPath = "/test";
+
+  public UpdatePuppy(MetaClientInterface<String> metaclient, PuppySpec puppySpec) {
+    super(metaclient, puppySpec);
+    _random = new Random();
+  }
+
+  @Override
+  protected void bark() throws Exception {
+    int randomNumber = _random.nextInt(_puppySpec.getNumberDiffPaths());
+    if (shouldIntroduceError()) {
+      try {
+        _metaclient.update("invalid", (data) -> "foo");
+      } catch (IllegalArgumentException e) {
+        System.out.println(Thread.currentThread().getName() + " intentionally tried to update an invalid path" + " at time: " + System.currentTimeMillis());
+      }
+    } else {
+      try {
+        System.out.println(Thread.currentThread().getName() + " is attempting to update node: " + randomNumber + " at time: " + System.currentTimeMillis());
+        _metaclient.update(_parentPath + "/" + randomNumber, (data) -> "foo");
+        _eventChangeCounterMap.put(String.valueOf(randomNumber), _eventChangeCounterMap.getOrDefault(String.valueOf(randomNumber), 0) + 1);
+        System.out.println(Thread.currentThread().getName() + " successfully updated node " + randomNumber + " at time: "
+            + System.currentTimeMillis());
+      } catch (MetaClientNoNodeException e) {
+        System.out.println(Thread.currentThread().getName() + " failed to update node " + randomNumber + " at time: " + System.currentTimeMillis() + ", it does not exist");
+      } catch (IllegalArgumentException e) {
+        if (!e.getMessage().equals("Can not subscribe one time watcher when ZkClient is using PersistWatcher")) {
+          throw e;
+        }
+      }
+    }
+  }
+
+  @Override
+  protected void cleanup() {
+    _metaclient.recursiveDelete(_parentPath);
+  }
+
+  private boolean shouldIntroduceError() {
+    float randomValue = _random.nextFloat();
+    return randomValue < _puppySpec.getErrorRate();
+  }
+}
diff --git a/meta-client/src/test/java/org/apache/helix/metaclient/puppy/AbstractPuppy.java b/meta-client/src/test/java/org/apache/helix/metaclient/puppy/AbstractPuppy.java
index 85137fc17..bfbbb915d 100644
--- a/meta-client/src/test/java/org/apache/helix/metaclient/puppy/AbstractPuppy.java
+++ b/meta-client/src/test/java/org/apache/helix/metaclient/puppy/AbstractPuppy.java
@@ -29,8 +29,9 @@ public abstract class AbstractPuppy implements Runnable {
 
   protected MetaClientInterface<String> _metaclient;
   protected PuppySpec _puppySpec;
-  public HashMap<String, Integer> _eventChangeCounterMap;
-  protected int _unhandledErrorCounter;
+  public final HashMap<String, Integer> _eventChangeCounterMap;
+  public int _unhandledErrorCounter;
+
 
   public AbstractPuppy(MetaClientInterface<String> metaclient, PuppySpec puppySpec) {
     _metaclient = metaclient;
@@ -69,6 +70,8 @@ public abstract class AbstractPuppy implements Runnable {
           try {
             Thread.sleep(getPuppySpec().getExecDelay().getNextDelay());
           } catch (InterruptedException e) {
+            cleanup();
+            break;
             // Handle interruption if necessary
           }
         }