You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2020/06/01 16:19:45 UTC

[pulsar] branch master updated: MockZooKeeper#failNow is unreliable (#7109)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 7314ac2  MockZooKeeper#failNow is unreliable (#7109)
7314ac2 is described below

commit 7314ac2c290939975b827d8e8ec84a755108b6a1
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Mon Jun 1 09:19:30 2020 -0700

    MockZooKeeper#failNow is unreliable (#7109)
    
    * MockZooKeeper#failNow is unreliable
    
    The MockZooKeeper#failNow instructs the MockZooKeeper instance to fail
    the next call to zookeeper. In a multithreaded system with many things
    accessing zookeeper, using #failNow is unreliable, as a background
    thread could try to access ZK before the call that is actually under
    tests accesses it.
    
    This change tightens the condition on which the failed ZK call can
    occur, by checking the operation type and path. This resolves a flake
    that was occuring in ZooKeeperSessionExpiryRecoveryTest.
    
    * Fixed import missing after merge
    
    Co-authored-by: Ivan Kelly <ik...@splunk.com>
---
 .../bookkeeper/mledger/impl/ManagedCursorTest.java |   7 +-
 .../mledger/impl/ManagedLedgerErrorsTest.java      |  57 +++++++--
 .../bookkeeper/mledger/impl/ManagedLedgerTest.java |   7 +-
 .../bookkeeper/mledger/impl/MetaStoreImplTest.java |  52 ++++----
 .../org/apache/pulsar/broker/admin/AdminTest.java  |  70 +++++++++--
 .../apache/pulsar/broker/admin/NamespacesTest.java |  82 +++++++++++--
 .../broker/namespace/OwnershipCacheTest.java       |   9 +-
 .../ZooKeeperSessionExpireRecoveryTest.java        |  18 +--
 .../discovery/service/DiscoveryServiceTest.java    |   6 +-
 .../zookeeper/ZooKeeperSessionWatcherTest.java     |   1 -
 .../pulsar/zookeeper/ZookeeperCacheTest.java       |  25 +++-
 .../java/org/apache/zookeeper/MockZooKeeper.java   | 131 +++++++++++++--------
 12 files changed, 339 insertions(+), 126 deletions(-)

diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index 294b806..e3ba438 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -82,6 +82,7 @@ import org.apache.bookkeeper.mledger.proto.MLDataFormats.PositionInfo;
 import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
 import org.apache.pulsar.metadata.api.Stat;
 import org.apache.zookeeper.KeeperException.Code;
+import org.apache.zookeeper.MockZooKeeper;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 import org.slf4j.Logger;
@@ -1168,7 +1169,11 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase {
         ManagedLedger ledger = factory.open("my_test_ledger");
 
         bkc.failAfter(1, BKException.Code.NotEnoughBookiesException);
-        zkc.failNow(Code.SESSIONEXPIRED);
+        zkc.failConditional(Code.SESSIONEXPIRED, (op, path) -> {
+                return path.equals("/managed-ledgers/my_test_ledger/c1")
+                    && op == MockZooKeeper.Op.CREATE;
+            });
+
         try {
             ledger.openCursor("c1");
             fail("should have failed");
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerErrorsTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerErrorsTest.java
index 99b6bd5..977346a 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerErrorsTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerErrorsTest.java
@@ -25,6 +25,7 @@ import static org.testng.Assert.fail;
 
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.api.DigestType;
@@ -41,6 +42,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
 import org.apache.zookeeper.KeeperException.Code;
+import org.apache.zookeeper.MockZooKeeper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.annotations.Test;
@@ -54,7 +56,10 @@ public class ManagedLedgerErrorsTest extends MockedBookKeeperTestCase {
 
         assertNotNull(zkc.exists("/managed-ledgers/my_test_ledger/c1", false));
 
-        zkc.failNow(Code.BADVERSION);
+        zkc.failConditional(Code.BADVERSION, (op, path) -> {
+                return op == MockZooKeeper.Op.SET
+                    && path.equals("/managed-ledgers/my_test_ledger/c1");
+            });
 
         try {
             c1.close();
@@ -77,7 +82,10 @@ public class ManagedLedgerErrorsTest extends MockedBookKeeperTestCase {
         ManagedLedger ledger = factory.open("my_test_ledger");
         ledger.openCursor("c1");
 
-        zkc.failNow(Code.CONNECTIONLOSS);
+        zkc.failConditional(Code.CONNECTIONLOSS, (op, path) -> {
+                return op == MockZooKeeper.Op.DELETE
+                    && path.equals("/managed-ledgers/my_test_ledger/c1");
+            });
 
         try {
             ledger.deleteCursor("c1");
@@ -207,7 +215,11 @@ public class ManagedLedgerErrorsTest extends MockedBookKeeperTestCase {
 
         factory = new ManagedLedgerFactoryImpl(bkc, zkc);
 
-        zkc.failAfter(1, Code.CONNECTIONLOSS);
+
+        zkc.failConditional(Code.CONNECTIONLOSS, (op, path) -> {
+                return path.equals("/managed-ledgers/my_test_ledger")
+                    && op == MockZooKeeper.Op.SET;
+            });
 
         try {
             ledger = factory.open("my_test_ledger");
@@ -229,7 +241,10 @@ public class ManagedLedgerErrorsTest extends MockedBookKeeperTestCase {
 
         factory = new ManagedLedgerFactoryImpl(bkc, zkc);
 
-        zkc.failAfter(2, Code.CONNECTIONLOSS);
+        zkc.failConditional(Code.CONNECTIONLOSS, (op, path) -> {
+                return path.equals("/managed-ledgers/my_test_ledger")
+                    && op == MockZooKeeper.Op.GET_CHILDREN;
+            });
 
         try {
             ledger = factory.open("my_test_ledger");
@@ -252,7 +267,10 @@ public class ManagedLedgerErrorsTest extends MockedBookKeeperTestCase {
 
         factory = new ManagedLedgerFactoryImpl(bkc, zkc);
 
-        zkc.failAfter(3, Code.CONNECTIONLOSS);
+        zkc.failConditional(Code.CONNECTIONLOSS, (op, path) -> {
+                return path.equals("/managed-ledgers/my_test_ledger/c1")
+                    && op == MockZooKeeper.Op.GET;
+            });
 
         try {
             ledger = factory.open("my_test_ledger");
@@ -302,9 +320,12 @@ public class ManagedLedgerErrorsTest extends MockedBookKeeperTestCase {
     public void errorInUpdatingLedgersList() throws Exception {
         ManagedLedger ledger = factory.open("my_test_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(1));
 
-        final CountDownLatch latch = new CountDownLatch(1);
+        CompletableFuture<Void> promise = new CompletableFuture<>();
 
-        zkc.failAfter(0, Code.CONNECTIONLOSS);
+        zkc.failConditional(Code.CONNECTIONLOSS, (op, path) -> {
+                return path.equals("/managed-ledgers/my_test_ledger")
+                    && op == MockZooKeeper.Op.SET;
+            });
 
         ledger.asyncAddEntry("entry".getBytes(), new AddEntryCallback() {
             public void addFailed(ManagedLedgerException exception, Object ctx) {
@@ -318,22 +339,25 @@ public class ManagedLedgerErrorsTest extends MockedBookKeeperTestCase {
 
         ledger.asyncAddEntry("entry".getBytes(), new AddEntryCallback() {
             public void addFailed(ManagedLedgerException exception, Object ctx) {
-                latch.countDown();
+                promise.complete(null);
             }
 
             public void addComplete(Position position, Object ctx) {
-                fail("should have failed");
+                promise.completeExceptionally(new Exception("should have failed"));
             }
         }, null);
 
-        latch.await();
+        promise.get();
     }
 
     @Test
     public void recoverAfterZnodeVersionError() throws Exception {
         ManagedLedger ledger = factory.open("my_test_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(1));
 
-        zkc.failNow(Code.BADVERSION);
+        zkc.failConditional(Code.BADVERSION, (op, path) -> {
+                return path.equals("/managed-ledgers/my_test_ledger")
+                    && op == MockZooKeeper.Op.SET;
+            });
 
         // First write will succeed
         ledger.addEntry("test".getBytes());
@@ -369,7 +393,11 @@ public class ManagedLedgerErrorsTest extends MockedBookKeeperTestCase {
         assertEquals(cursor.getNumberOfEntriesInBacklog(false), 1);
 
         bkc.failNow(BKException.Code.BookieHandleNotAvailableException);
-        zkc.failNow(Code.CONNECTIONLOSS);
+        zkc.failConditional(Code.CONNECTIONLOSS, (op, path) -> {
+                return path.equals("/managed-ledgers/my_test_ledger")
+                    && op == MockZooKeeper.Op.SET;
+            });
+
         try {
             ledger.addEntry("entry-2".getBytes());
             fail("should fail");
@@ -458,7 +486,10 @@ public class ManagedLedgerErrorsTest extends MockedBookKeeperTestCase {
         Position position = ledger.addEntry("entry".getBytes());
 
         bkc.failNow(BKException.Code.BookieHandleNotAvailableException);
-        zkc.failNow(Code.CONNECTIONLOSS);
+        zkc.failConditional(Code.CONNECTIONLOSS, (op, path) -> {
+                return path.equals("/managed-ledgers/my_test_ledger/my-cursor")
+                    && op == MockZooKeeper.Op.SET;
+            });
 
         try {
             cursor.markDelete(position);
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index 2b02e0b..3e7a51d 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -115,6 +115,7 @@ import org.apache.pulsar.metadata.api.Stat;
 import org.apache.pulsar.metadata.impl.zookeeper.ZKMetadataStore;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException.Code;
+import org.apache.zookeeper.MockZooKeeper;
 import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.ZooKeeper;
 import org.slf4j.Logger;
@@ -1467,7 +1468,11 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
         assertEquals(ledger.getLedgersInfoAsList().size(), 1);
 
         bkc.failNow(BKException.Code.NoBookieAvailableException);
-        zkc.failNow(Code.CONNECTIONLOSS);
+        zkc.failConditional(Code.CONNECTIONLOSS, (op, path) -> {
+                return path.equals("/managed-ledgers/my_test_ledger")
+                    && op == MockZooKeeper.Op.SET;
+            });
+
         try {
             ledger.addEntry("entry".getBytes());
             fail("Should have received exception");
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/MetaStoreImplTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/MetaStoreImplTest.java
index 2337b4d..e41c550 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/MetaStoreImplTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/MetaStoreImplTest.java
@@ -21,6 +21,7 @@ package org.apache.bookkeeper.mledger.impl;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.fail;
 
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -34,6 +35,7 @@ import org.apache.pulsar.metadata.api.Stat;
 import org.apache.pulsar.metadata.impl.zookeeper.ZKMetadataStore;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException.Code;
+import org.apache.zookeeper.MockZooKeeper;
 import org.apache.zookeeper.ZooDefs;
 import org.testng.annotations.Test;
 
@@ -43,7 +45,10 @@ public class MetaStoreImplTest extends MockedBookKeeperTestCase {
     void getMLList() throws Exception {
         MetaStore store = new MetaStoreImpl(new ZKMetadataStore(zkc), executor);
 
-        zkc.failNow(Code.CONNECTIONLOSS);
+        zkc.failConditional(Code.CONNECTIONLOSS, (op, path) -> {
+                return op == MockZooKeeper.Op.GET_CHILDREN
+                    && path.equals("/managed-ledgers");
+            });
 
         try {
             store.getManagedLedgers();
@@ -129,22 +134,22 @@ public class MetaStoreImplTest extends MockedBookKeeperTestCase {
     void failInCreatingMLnode() throws Exception {
         MetaStore store = new MetaStoreImpl(new ZKMetadataStore(zkc), executor);
 
-        final CountDownLatch latch = new CountDownLatch(1);
+        final CompletableFuture<Void> promise = new CompletableFuture<>();
 
-        zkc.failAfter(1, Code.CONNECTIONLOSS);
+        zkc.failConditional(Code.CONNECTIONLOSS, (op, path) -> {
+                return op == MockZooKeeper.Op.CREATE;
+            });
 
         store.getManagedLedgerInfo("my_test", false, new MetaStoreCallback<MLDataFormats.ManagedLedgerInfo>() {
             public void operationFailed(MetaStoreException e) {
-                // Ok
-                latch.countDown();
+                promise.complete(null);
             }
 
             public void operationComplete(ManagedLedgerInfo result, Stat version) {
-                fail("Operation should have failed");
+                promise.completeExceptionally(new Exception("Operation should have failed"));
             }
         });
-
-        latch.await();
+        promise.get();
     }
 
     @Test(timeOut = 20000)
@@ -153,34 +158,36 @@ public class MetaStoreImplTest extends MockedBookKeeperTestCase {
 
         zkc.create("/managed-ledgers/my_test", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
 
-        final CountDownLatch latch = new CountDownLatch(1);
+        final CompletableFuture<Void> promise = new CompletableFuture<>();
 
         ManagedCursorInfo info = ManagedCursorInfo.newBuilder().setCursorsLedgerId(1).build();
         store.asyncUpdateCursorInfo("my_test", "c1", info, null, new MetaStoreCallback<Void>() {
             public void operationFailed(MetaStoreException e) {
-                fail("should have succeeded");
+                promise.completeExceptionally(e);
             }
 
             public void operationComplete(Void result, Stat version) {
                 // Update again using the version
-                zkc.failNow(Code.CONNECTIONLOSS);
+                zkc.failConditional(Code.CONNECTIONLOSS, (op, path) -> {
+                        return op == MockZooKeeper.Op.SET
+                            && path.contains("my_test") && path.contains("c1");
+                    });
 
                 ManagedCursorInfo info = ManagedCursorInfo.newBuilder().setCursorsLedgerId(2).build();
                 store.asyncUpdateCursorInfo("my_test", "c1", info, version, new MetaStoreCallback<Void>() {
                     public void operationFailed(MetaStoreException e) {
                         // ok
-                        latch.countDown();
+                        promise.complete(null);
                     }
 
                     @Override
                     public void operationComplete(Void result, Stat version) {
-                        fail("should have failed");
+                        promise.completeExceptionally(new Exception("should have failed"));
                     }
                 });
             }
         });
-
-        latch.await();
+        promise.get();
     }
 
     @Test(timeOut = 20000)
@@ -189,31 +196,34 @@ public class MetaStoreImplTest extends MockedBookKeeperTestCase {
 
         zkc.create("/managed-ledgers/my_test", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
 
-        final CountDownLatch latch = new CountDownLatch(1);
+        final CompletableFuture<Void> promise = new CompletableFuture<>();
 
         store.getManagedLedgerInfo("my_test", false, new MetaStoreCallback<ManagedLedgerInfo>() {
             public void operationFailed(MetaStoreException e) {
-                fail("should have succeeded");
+                promise.completeExceptionally(e);
             }
 
             public void operationComplete(ManagedLedgerInfo mlInfo, Stat version) {
                 // Update again using the version
-                zkc.failNow(Code.BADVERSION);
+                zkc.failConditional(Code.BADVERSION, (op, path) -> {
+                        return op == MockZooKeeper.Op.SET
+                            && path.contains("my_test");
+                    });
 
                 store.asyncUpdateLedgerIds("my_test", mlInfo, version, new MetaStoreCallback<Void>() {
                     public void operationFailed(MetaStoreException e) {
                         // ok
-                        latch.countDown();
+                        promise.complete(null);
                     }
 
                     @Override
                     public void operationComplete(Void result, Stat version) {
-                        fail("should have failed");
+                        promise.completeExceptionally(new Exception("should have failed"));
                     }
                 });
             }
         });
 
-        latch.await();
+        promise.get();
     }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
index 7d45bce..4a5cfdb 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
@@ -70,7 +70,6 @@ import org.apache.pulsar.broker.web.PulsarWebResource;
 import org.apache.pulsar.broker.web.RestException;
 import org.apache.pulsar.common.conf.InternalConfigurationData;
 import org.apache.pulsar.common.naming.NamespaceName;
-import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.policies.data.AuthAction;
 import org.apache.pulsar.common.policies.data.AutoFailoverPolicyData;
 import org.apache.pulsar.common.policies.data.AutoFailoverPolicyType;
@@ -86,6 +85,7 @@ import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException.Code;
+import org.apache.zookeeper.MockZooKeeper;
 import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.ZooDefs.Ids;
 import org.mockito.ArgumentCaptor;
@@ -93,7 +93,12 @@ import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 public class AdminTest extends MockedPulsarServiceBaseTest {
+    private static final Logger log = LoggerFactory.getLogger(AdminTest.class);
+
     private final String configClusterName = "use";
     private ConfigurationCacheService configurationCache;
     private Clusters clusters;
@@ -318,7 +323,10 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
         }
 
         // Test zk failures
-        mockZooKeeper.failNow(Code.SESSIONEXPIRED);
+        mockZooKeeper.failConditional(Code.SESSIONEXPIRED, (op, path) -> {
+                return op == MockZooKeeper.Op.GET_CHILDREN
+                    && path.equals("/admin/clusters");
+            });
         configurationCache.clustersListCache().clear();
         try {
             clusters.getClusters();
@@ -327,7 +335,10 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
             assertEquals(e.getResponse().getStatus(), Status.INTERNAL_SERVER_ERROR.getStatusCode());
         }
 
-        mockZooKeeper.failNow(Code.SESSIONEXPIRED);
+        mockZooKeeper.failConditional(Code.SESSIONEXPIRED, (op, path) -> {
+                return op == MockZooKeeper.Op.CREATE
+                    && path.equals("/admin/clusters/test");
+            });
         try {
             clusters.createCluster("test", new ClusterData("http://broker.messaging.test.example.com"));
             fail("should have failed");
@@ -335,7 +346,10 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
             assertEquals(e.getResponse().getStatus(), Status.INTERNAL_SERVER_ERROR.getStatusCode());
         }
 
-        mockZooKeeper.failNow(Code.SESSIONEXPIRED);
+        mockZooKeeper.failConditional(Code.SESSIONEXPIRED, (op, path) -> {
+                return op == MockZooKeeper.Op.GET
+                    && path.equals("/admin/clusters/test");
+            });
         try {
             clusters.updateCluster("test", new ClusterData("http://broker.messaging.test.example.com"));
             fail("should have failed");
@@ -343,7 +357,11 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
             assertEquals(e.getResponse().getStatus(), Status.INTERNAL_SERVER_ERROR.getStatusCode());
         }
 
-        mockZooKeeper.failNow(Code.SESSIONEXPIRED);
+        mockZooKeeper.failConditional(Code.SESSIONEXPIRED, (op, path) -> {
+                return op == MockZooKeeper.Op.GET
+                    && path.equals("/admin/clusters/test");
+            });
+
         try {
             clusters.getCluster("test");
             fail("should have failed");
@@ -351,7 +369,11 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
             assertEquals(e.getResponse().getStatus(), Status.INTERNAL_SERVER_ERROR.getStatusCode());
         }
 
-        mockZooKeeper.failAfter(0, Code.SESSIONEXPIRED);
+        mockZooKeeper.failConditional(Code.SESSIONEXPIRED, (op, path) -> {
+                return op == MockZooKeeper.Op.GET_CHILDREN
+                    && path.equals("/admin/policies");
+            });
+
         try {
             clusters.deleteCluster("use");
             fail("should have failed");
@@ -359,7 +381,10 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
             assertEquals(e.getResponse().getStatus(), Status.INTERNAL_SERVER_ERROR.getStatusCode());
         }
 
-        mockZooKeeper.failAfter(1, Code.SESSIONEXPIRED);
+        mockZooKeeper.failConditional(Code.SESSIONEXPIRED, (op, path) -> {
+                return op == MockZooKeeper.Op.GET
+                    && path.equals("/admin/clusters/use/namespaceIsolationPolicies");
+            });
         try {
             clusters.deleteCluster("use");
             fail("should have failed");
@@ -439,7 +464,10 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
         }
 
         // Test zk failures
-        mockZooKeeper.failNow(Code.SESSIONEXPIRED);
+        mockZooKeeper.failConditional(Code.SESSIONEXPIRED, (op, path) -> {
+                return op == MockZooKeeper.Op.GET_CHILDREN
+                    && path.equals("/admin/policies");
+            });
         try {
             properties.getTenants();
             fail("should have failed");
@@ -447,7 +475,10 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
             assertEquals(e.getResponse().getStatus(), Status.INTERNAL_SERVER_ERROR.getStatusCode());
         }
 
-        mockZooKeeper.failNow(Code.SESSIONEXPIRED);
+        mockZooKeeper.failConditional(Code.SESSIONEXPIRED, (op, path) -> {
+                return op == MockZooKeeper.Op.GET
+                    && path.equals("/admin/policies/my-tenant");
+            });
         try {
             properties.getTenantAdmin("my-tenant");
             fail("should have failed");
@@ -455,7 +486,10 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
             assertEquals(e.getResponse().getStatus(), Status.INTERNAL_SERVER_ERROR.getStatusCode());
         }
 
-        mockZooKeeper.failNow(Code.SESSIONEXPIRED);
+        mockZooKeeper.failConditional(Code.SESSIONEXPIRED, (op, path) -> {
+                return op == MockZooKeeper.Op.GET
+                    && path.equals("/admin/policies/my-tenant");
+            });
         try {
             properties.updateTenant("my-tenant", newPropertyAdmin);
             fail("should have failed");
@@ -463,7 +497,10 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
             assertEquals(e.getResponse().getStatus(), Status.INTERNAL_SERVER_ERROR.getStatusCode());
         }
 
-        mockZooKeeper.failNow(Code.SESSIONEXPIRED);
+        mockZooKeeper.failConditional(Code.SESSIONEXPIRED, (op, path) -> {
+                return op == MockZooKeeper.Op.CREATE
+                    && path.equals("/admin/policies/test");
+            });
         try {
             properties.createTenant("test", tenantInfo);
             fail("should have failed");
@@ -471,7 +508,10 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
             assertEquals(e.getResponse().getStatus(), Status.INTERNAL_SERVER_ERROR.getStatusCode());
         }
 
-        mockZooKeeper.failNow(Code.SESSIONEXPIRED);
+        mockZooKeeper.failConditional(Code.SESSIONEXPIRED, (op, path) -> {
+                return op == MockZooKeeper.Op.GET_CHILDREN
+                    && path.equals("/admin/policies/my-tenant");
+            });
         try {
             properties.deleteTenant("my-tenant");
             fail("should have failed");
@@ -480,7 +520,11 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
         }
 
         properties.createTenant("error-property", tenantInfo);
-        mockZooKeeper.failAfter(2, Code.SESSIONEXPIRED);
+
+        mockZooKeeper.failConditional(Code.SESSIONEXPIRED, (op, path) -> {
+                return op == MockZooKeeper.Op.DELETE
+                    && path.equals("/admin/policies/error-property");
+            });
         try {
             properties.deleteTenant("error-property");
             fail("should have failed");
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
index 23f8954..66193a0b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
@@ -81,7 +81,6 @@ import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.AuthAction;
 import org.apache.pulsar.common.policies.data.BundlesData;
 import org.apache.pulsar.common.policies.data.ClusterData;
-import org.apache.pulsar.common.policies.data.NamespaceOperation;
 import org.apache.pulsar.common.policies.data.OffloadPolicies;
 import org.apache.pulsar.common.policies.data.PersistencePolicies;
 import org.apache.pulsar.common.policies.data.Policies;
@@ -93,6 +92,7 @@ import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException.Code;
+import org.apache.zookeeper.MockZooKeeper;
 import org.apache.zookeeper.ZooDefs;
 import org.mockito.ArgumentCaptor;
 import org.mockito.ArgumentMatcher;
@@ -101,9 +101,11 @@ import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class NamespacesTest extends MockedPulsarServiceBaseTest {
-
+    private static final Logger log = LoggerFactory.getLogger(NamespacesTest.class);
     private Namespaces namespaces;
 
     private List<NamespaceName> testLocalNamespaces;
@@ -226,7 +228,10 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
             assertEquals(e.getResponse().getStatus(), Status.PRECONDITION_FAILED.getStatusCode());
         }
 
-        mockZooKeeper.failNow(Code.SESSIONEXPIRED);
+        mockZooKeeper.failConditional(Code.SESSIONEXPIRED, (op, path) -> {
+                return op == MockZooKeeper.Op.CREATE
+                    && path.equals("/admin/policies/my-tenant/use/my-namespace-3");
+            });
         try {
             namespaces.createNamespace(this.testTenant, "use", "my-namespace-3", new BundlesData());
             fail("should have failed");
@@ -262,7 +267,10 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
         }
 
         // ZK Errors
-        mockZooKeeper.failNow(Code.SESSIONEXPIRED);
+        mockZooKeeper.failConditional(Code.SESSIONEXPIRED, (op, path) -> {
+                return op == MockZooKeeper.Op.GET_CHILDREN
+                    && path.equals("/admin/policies/my-tenant");
+            });
         try {
             namespaces.getTenantNamespaces(this.testTenant);
             fail("should have failed");
@@ -270,7 +278,10 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
             // Ok
         }
 
-        mockZooKeeper.failNow(Code.SESSIONEXPIRED);
+        mockZooKeeper.failConditional(Code.SESSIONEXPIRED, (op, path) -> {
+                return op == MockZooKeeper.Op.GET_CHILDREN
+                    && path.equals("/admin/policies/my-tenant/use");
+            });
         try {
             namespaces.getNamespacesForCluster(this.testTenant, this.testLocalCluster);
             fail("should have failed");
@@ -346,7 +357,14 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
 
         NamespaceName testNs = this.testLocalNamespaces.get(1);
 
-        mockZooKeeper.failNow(Code.SESSIONEXPIRED);
+        mockZooKeeper.failConditional(Code.SESSIONEXPIRED, (op, path) -> {
+                // test is disabled and failing so I can't see what paths are needed here
+                // if it ever gets enabled and fixed, first check what is expected and update these
+                // paths
+                log.info("Condition1: {} {}", op, path);
+                return true;
+            });
+
         try {
             namespaces.getPolicies(testNs.getTenant(), testNs.getCluster(), testNs.getLocalName());
             fail("should have failed");
@@ -354,7 +372,13 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
             // Ok
         }
 
-        mockZooKeeper.failNow(Code.SESSIONEXPIRED);
+        mockZooKeeper.failConditional(Code.SESSIONEXPIRED, (op, path) -> {
+                // test is disabled and failing so I can't see what paths are needed here
+                // if it ever gets enabled and fixed, first check what is expected and update these
+                // paths
+                log.info("Condition2: {} {}", op, path);
+                return true;
+            });
         try {
             namespaces.getPermissions(testNs.getTenant(), testNs.getCluster(), testNs.getLocalName());
             fail("should have failed");
@@ -362,7 +386,13 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
             // Ok
         }
 
-        mockZooKeeper.failNow(Code.SESSIONEXPIRED);
+        mockZooKeeper.failConditional(Code.SESSIONEXPIRED, (op, path) -> {
+                // test is disabled and failing so I can't see what paths are needed here
+                // if it ever gets enabled and fixed, first check what is expected and update these
+                // paths
+                log.info("Condition3: {} {}", op, path);
+                return true;
+            });
         try {
             namespaces.grantPermissionOnNamespace(testNs.getTenant(), testNs.getCluster(), testNs.getLocalName(),
                     "other-role", EnumSet.of(AuthAction.consume));
@@ -371,7 +401,13 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
             // Ok
         }
 
-        mockZooKeeper.failNow(Code.BADVERSION);
+        mockZooKeeper.failConditional(Code.BADVERSION, (op, path) -> {
+                // test is disabled and failing so I can't see what paths are needed here
+                // if it ever gets enabled and fixed, first check what is expected and update these
+                // paths
+                log.info("Condition4: {} {}", op, path);
+                return true;
+            });
         try {
             namespaces.grantPermissionOnNamespace(testNs.getTenant(), testNs.getCluster(), testNs.getLocalName(),
                     "other-role", EnumSet.of(AuthAction.consume));
@@ -380,7 +416,13 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
             assertEquals(e.getResponse().getStatus(), Status.CONFLICT.getStatusCode());
         }
 
-        mockZooKeeper.failNow(Code.BADVERSION);
+        mockZooKeeper.failConditional(Code.BADVERSION, (op, path) -> {
+                // test is disabled and failing so I can't see what paths are needed here
+                // if it ever gets enabled and fixed, first check what is expected and update these
+                // paths
+                log.info("Condition5: {} {}", op, path);
+                return true;
+            });
         try {
             namespaces.revokePermissionsOnNamespace(testNs.getTenant(), testNs.getCluster(), testNs.getLocalName(),
                     "other-role");
@@ -389,7 +431,13 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
             assertEquals(e.getResponse().getStatus(), Status.CONFLICT.getStatusCode());
         }
 
-        mockZooKeeper.failNow(Code.SESSIONEXPIRED);
+        mockZooKeeper.failConditional(Code.SESSIONEXPIRED, (op, path) -> {
+                // test is disabled and failing so I can't see what paths are needed here
+                // if it ever gets enabled and fixed, first check what is expected and update these
+                // paths
+                log.info("Condition6: {} {}", op, path);
+                return true;
+            });
         try {
             namespaces.revokePermissionsOnNamespace(testNs.getTenant(), testNs.getCluster(), testNs.getLocalName(),
                     "other-role");
@@ -469,7 +517,11 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
             mockZooKeeper.unsetAlwaysFail();
         }
 
-        mockZooKeeper.failNow(Code.BADVERSION);
+        mockZooKeeper.failConditional(Code.BADVERSION, (op, path) -> {
+                return op == MockZooKeeper.Op.SET
+                    && path.equals("/admin/policies/my-tenant/global/test-global-ns1");
+            });
+
         try {
             namespaces.setNamespaceReplicationClusters(this.testTenant, "global",
                     this.testGlobalNamespaces.get(0).getLocalName(), Lists.newArrayList("use"));
@@ -493,7 +545,11 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
             assertEquals(e.getResponse().getStatus(), Status.NOT_FOUND.getStatusCode());
         }
 
-        mockZooKeeper.failNow(Code.SESSIONEXPIRED);
+        mockZooKeeper.failConditional(Code.SESSIONEXPIRED, (op, path) -> {
+                return op == MockZooKeeper.Op.GET
+                    && path.equals("/admin/policies/my-tenant/global/test-global-ns1");
+            });
+
         pulsar.getConfigurationCache().policiesCache().clear();
 
         // ensure the ZooKeeper read happens, bypassing the cache
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnershipCacheTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnershipCacheTest.java
index fef3fa8..35e269d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnershipCacheTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnershipCacheTest.java
@@ -56,8 +56,11 @@ import org.apache.zookeeper.MockZooKeeper;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class OwnershipCacheTest {
+    private static final Logger log = LoggerFactory.getLogger(OwnershipCacheTest.class);
 
     private PulsarService pulsar;
     private ServiceConfiguration config;
@@ -183,7 +186,11 @@ public class OwnershipCacheTest {
         assertEquals(data1, readOnlyData);
 
         MockZooKeeper mockZk = (MockZooKeeper) zkCache.getZooKeeper();
-        mockZk.failNow(KeeperException.Code.NONODE);
+        mockZk.failConditional(KeeperException.Code.NONODE, (op, path) -> {
+                return op == MockZooKeeper.Op.GET
+                    && path.equals("/namespace/pulsar/test/ns-none/0x00000000_0xffffffff");
+            });
+
         Optional<NamespaceEphemeralData> res = cache
                 .getOwnerAsync(bundleFactory.getFullBundle(NamespaceName.get("pulsar/test/ns-none"))).get();
         assertFalse(res.isPresent());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/zookeeper/ZooKeeperSessionExpireRecoveryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/zookeeper/ZooKeeperSessionExpireRecoveryTest.java
index 7d06180..dc6391c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/zookeeper/ZooKeeperSessionExpireRecoveryTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/zookeeper/ZooKeeperSessionExpireRecoveryTest.java
@@ -21,25 +21,26 @@ package org.apache.pulsar.broker.zookeeper;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
+import com.google.common.collect.Sets;
+
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.zookeeper.KeeperException.Code;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
+import org.apache.zookeeper.MockZooKeeper;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
-import com.google.common.collect.Sets;
-
 public class ZooKeeperSessionExpireRecoveryTest extends MockedPulsarServiceBaseTest {
 
-    @BeforeClass
+    @BeforeMethod
     @Override
     protected void setup() throws Exception {
         super.internalSetup();
     }
 
-    @AfterClass
+    @AfterMethod
     @Override
     protected void cleanup() throws Exception {
         super.internalCleanup();
@@ -54,7 +55,10 @@ public class ZooKeeperSessionExpireRecoveryTest extends MockedPulsarServiceBaseT
 
         assertTrue(Sets.newHashSet(admin.clusters().getClusters()).contains("my-cluster"));
 
-        mockZooKeeper.failNow(Code.SESSIONEXPIRED);
+        mockZooKeeper.failConditional(Code.SESSIONEXPIRED, (op, path) -> {
+                return op == MockZooKeeper.Op.CREATE
+                    && path.equals("/admin/clusters/my-cluster-2");
+            });
 
         assertTrue(Sets.newHashSet(admin.clusters().getClusters()).contains("my-cluster"));
 
diff --git a/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/DiscoveryServiceTest.java b/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/DiscoveryServiceTest.java
index 339a7bf..b7ab036 100644
--- a/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/DiscoveryServiceTest.java
+++ b/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/DiscoveryServiceTest.java
@@ -48,6 +48,7 @@ import org.apache.pulsar.zookeeper.ZookeeperClientFactoryImpl;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException.Code;
 import org.apache.zookeeper.KeeperException.SessionExpiredException;
+import org.apache.zookeeper.MockZooKeeper;
 import org.apache.zookeeper.ZooDefs;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
@@ -106,7 +107,10 @@ public class DiscoveryServiceTest extends BaseDiscoveryTestSetup {
         assertEquals(m.partitions, 0);
 
         // Simulate ZK error
-        mockZooKeeper.failNow(Code.SESSIONEXPIRED);
+        mockZooKeeper.failConditional(Code.SESSIONEXPIRED, (op, path) -> {
+                return op == MockZooKeeper.Op.GET
+                    && path.equals("/admin/partitioned-topics/test/local/ns/persistent/my-topic-2");
+            });
         TopicName topic2 = TopicName.get("persistent://test/local/ns/my-topic-2");
         CompletableFuture<PartitionedTopicMetadata> future = service.getDiscoveryProvider()
                 .getPartitionedTopicMetadata(service, topic2, "role", null);
diff --git a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZooKeeperSessionWatcherTest.java b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZooKeeperSessionWatcherTest.java
index 5da73d3..9c44103 100644
--- a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZooKeeperSessionWatcherTest.java
+++ b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZooKeeperSessionWatcherTest.java
@@ -160,7 +160,6 @@ public class ZooKeeperSessionWatcherTest {
 
     @Test
     public void testRun6() throws Exception {
-        zkClient.failAfter(0, Code.OK);
         sessionWatcher.run();
         assertFalse(sessionWatcher.isShutdownStarted());
         assertEquals(sessionWatcher.getKeeperState(), KeeperState.SyncConnected);
diff --git a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperCacheTest.java b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperCacheTest.java
index 39f632f..b3db3d9 100644
--- a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperCacheTest.java
+++ b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperCacheTest.java
@@ -64,8 +64,11 @@ import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class ZookeeperCacheTest {
+    private static final Logger log = LoggerFactory.getLogger(ZookeeperCacheTest.class);
     private MockZooKeeper zkClient;
     private OrderedScheduler executor;
     private ScheduledExecutorService scheduledExecutor;
@@ -122,7 +125,10 @@ public class ZookeeperCacheTest {
         zkCacheService.process(new WatchedEvent(Event.EventType.None, KeeperState.Expired, null));
         assertEquals(zkCache.get("/my_test").get(), newValue);
 
-        zkClient.failNow(Code.SESSIONEXPIRED);
+        zkClient.failConditional(Code.SESSIONEXPIRED, (op, path) -> {
+                return op == MockZooKeeper.Op.GET
+                    && path.equals("/other");
+            });
 
         assertEquals(zkCache.get("/my_test").get(), newValue);
         try {
@@ -173,7 +179,10 @@ public class ZookeeperCacheTest {
         assertEquals(cache.get(), new TreeSet<String>(Lists.newArrayList("z1")));
         assertEquals(cache.get(), new TreeSet<String>(Lists.newArrayList("z1")));
         zkCacheService.process(new WatchedEvent(Event.EventType.None, KeeperState.Expired, null));
-        zkClient.failNow(Code.SESSIONEXPIRED);
+        zkClient.failConditional(Code.SESSIONEXPIRED, (op, path) -> {
+                return op == MockZooKeeper.Op.GET_CHILDREN
+                    && path.equals("/test");
+            });
 
         try {
             cache.get();
@@ -227,7 +236,10 @@ public class ZookeeperCacheTest {
         assertTrue(cache.get().isEmpty());
         assertTrue(cache.get().isEmpty());
         zkCacheService.process(new WatchedEvent(Event.EventType.None, KeeperState.Expired, null));
-        zkClient.failNow(Code.SESSIONEXPIRED);
+        zkClient.failConditional(Code.SESSIONEXPIRED, (op, path) -> {
+                return op == MockZooKeeper.Op.GET_CHILDREN
+                    && path.equals("/test");
+            });
 
         try {
             cache.get();
@@ -351,7 +363,11 @@ public class ZookeeperCacheTest {
         // case 3: update the znode directly while the client session is marked as expired. Verify that the new updates
         // is not seen in the cache
         zkClient.create("/other", newValue.getBytes(), null, null);
-        zkClient.failNow(Code.SESSIONEXPIRED);
+        zkClient.failConditional(Code.SESSIONEXPIRED, (op, path) -> {
+                return op == MockZooKeeper.Op.GET
+                    && path.equals("/other");
+            });
+
         assertEquals(zkCache.get("/my_test").get(), newValue);
         assertEquals(zkCache.get("/my_test2").get(), value);
         try {
@@ -363,7 +379,6 @@ public class ZookeeperCacheTest {
 
         // case 4: directly delete the znode while the session is not re-connected yet. Verify that the deletion is not
         // seen by the cache
-        zkClient.failAfter(-1, Code.OK);
         zkClient.delete("/my_test2", -1);
         zkCacheService.process(new WatchedEvent(Event.EventType.None, KeeperState.SyncConnected, null));
         assertEquals(zkCache.get("/other").get(), newValue);
diff --git a/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeper.java b/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeper.java
index a19c369..9a6a94d 100644
--- a/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeper.java
+++ b/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeper.java
@@ -26,13 +26,18 @@ import com.google.common.collect.SetMultimap;
 import com.google.common.collect.Sets;
 import io.netty.util.concurrent.DefaultThreadFactory;
 import java.lang.reflect.Constructor;
+import java.util.Iterator;
 import java.util.List;
+import java.util.Optional;
 import java.util.Set;
 import java.util.TreeMap;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.BiPredicate;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.zookeeper.AsyncCallback.Children2Callback;
 import org.apache.zookeeper.AsyncCallback.ChildrenCallback;
@@ -52,18 +57,30 @@ public class MockZooKeeper extends ZooKeeper {
     private TreeMap<String, Pair<byte[], Integer>> tree;
     private SetMultimap<String, Watcher> watchers;
     private volatile boolean stopped;
-    private boolean alwaysFail = false;
-
+    private AtomicReference<KeeperException.Code> alwaysFail;
+    private CopyOnWriteArrayList<Failure> failures;
     private ExecutorService executor;
 
-    private AtomicInteger stepsToFail;
-    private KeeperException.Code failReturnCode;
     private Watcher sessionWatcher;
     private long sessionId = 0L;
     private int readOpDelayMs;
 
     private ReentrantLock mutex;
 
+    public enum Op {
+        CREATE, GET, SET, GET_CHILDREN, DELETE, EXISTS, SYNC,
+    };
+
+    private class Failure {
+        final KeeperException.Code failReturnCode;
+        final BiPredicate<Op, String> predicate;
+
+        Failure(KeeperException.Code failReturnCode, BiPredicate<Op, String> predicate) {
+            this.failReturnCode = failReturnCode;
+            this.predicate = predicate;
+        }
+    }
+
     public static MockZooKeeper newInstance() {
         return newInstance(null);
     }
@@ -100,8 +117,8 @@ public class MockZooKeeper extends ZooKeeper {
         SetMultimap<String, Watcher> w = HashMultimap.create();
         watchers = Multimaps.synchronizedSetMultimap(w);
         stopped = false;
-        stepsToFail = new AtomicInteger(-1);
-        failReturnCode = KeeperException.Code.OK;
+        alwaysFail = new AtomicReference<>(KeeperException.Code.OK);
+        failures = new CopyOnWriteArrayList<>();
     }
 
     private MockZooKeeper(String quorum) throws Exception {
@@ -132,7 +149,7 @@ public class MockZooKeeper extends ZooKeeper {
         final String parent = path.substring(0, path.lastIndexOf("/"));
 
         try {
-            checkProgrammedFail();
+            maybeThrowProgrammedFailure(Op.CREATE, path);
 
             if (stopped)
                 throw new KeeperException.ConnectionLossException();
@@ -207,9 +224,10 @@ public class MockZooKeeper extends ZooKeeper {
                 toNotifyParent.addAll(watchers.get(parent));
             }
 
-            if (getProgrammedFailStatus()) {
+            Optional<KeeperException.Code> failure = programmedFailure(Op.CREATE, path);
+            if (failure.isPresent()) {
                 mutex.unlock();
-                cb.processResult(failReturnCode.intValue(), path, ctx, null);
+                cb.processResult(failure.get().intValue(), path, ctx, null);
             } else if (stopped) {
                 mutex.unlock();
                 cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), path, ctx, null);
@@ -244,7 +262,7 @@ public class MockZooKeeper extends ZooKeeper {
     public byte[] getData(String path, Watcher watcher, Stat stat) throws KeeperException {
         mutex.lock();
         try {
-            checkProgrammedFail();
+            maybeThrowProgrammedFailure(Op.GET, path);
             Pair<byte[], Integer> value = tree.get(path);
             if (value == null) {
                 throw new KeeperException.NoNodeException(path);
@@ -266,8 +284,9 @@ public class MockZooKeeper extends ZooKeeper {
     public void getData(final String path, boolean watch, final DataCallback cb, final Object ctx) {
         executor.execute(() -> {
             checkReadOpDelay();
-            if (getProgrammedFailStatus()) {
-                cb.processResult(failReturnCode.intValue(), path, ctx, null, null);
+            Optional<KeeperException.Code> failure = programmedFailure(Op.GET, path);
+            if (failure.isPresent()) {
+                cb.processResult(failure.get().intValue(), path, ctx, null, null);
                 return;
             } else if (stopped) {
                 cb.processResult(KeeperException.Code.ConnectionLoss, path, ctx, null, null);
@@ -297,9 +316,10 @@ public class MockZooKeeper extends ZooKeeper {
         executor.execute(() -> {
             checkReadOpDelay();
             mutex.lock();
-            if (getProgrammedFailStatus()) {
+            Optional<KeeperException.Code> failure = programmedFailure(Op.GET, path);
+            if (failure.isPresent()) {
                 mutex.unlock();
-                cb.processResult(failReturnCode.intValue(), path, ctx, null, null);
+                cb.processResult(failure.get().intValue(), path, ctx, null, null);
                 return;
             } else if (stopped) {
                 mutex.unlock();
@@ -328,9 +348,10 @@ public class MockZooKeeper extends ZooKeeper {
     public void getChildren(final String path, final Watcher watcher, final ChildrenCallback cb, final Object ctx) {
         executor.execute(() -> {
             mutex.lock();
-            if (getProgrammedFailStatus()) {
+            Optional<KeeperException.Code> failure = programmedFailure(Op.GET_CHILDREN, path);
+            if (failure.isPresent()) {
                 mutex.unlock();
-                cb.processResult(failReturnCode.intValue(), path, ctx, null);
+                cb.processResult(failure.get().intValue(), path, ctx, null);
                 return;
             } else if (stopped) {
                 mutex.unlock();
@@ -373,7 +394,7 @@ public class MockZooKeeper extends ZooKeeper {
     public List<String> getChildren(String path, Watcher watcher) throws KeeperException {
         mutex.lock();
         try {
-            checkProgrammedFail();
+            maybeThrowProgrammedFailure(Op.GET_CHILDREN, path);
 
             if (!tree.containsKey(path)) {
                 throw new KeeperException.NoNodeException();
@@ -409,7 +430,7 @@ public class MockZooKeeper extends ZooKeeper {
     public List<String> getChildren(String path, boolean watch) throws KeeperException, InterruptedException {
         mutex.lock();
         try {
-            checkProgrammedFail();
+            maybeThrowProgrammedFailure(Op.GET_CHILDREN, path);
 
             if (stopped) {
                 throw new KeeperException.ConnectionLossException();
@@ -445,9 +466,11 @@ public class MockZooKeeper extends ZooKeeper {
     public void getChildren(final String path, boolean watcher, final Children2Callback cb, final Object ctx) {
         executor.execute(() -> {
             mutex.lock();
-            if (getProgrammedFailStatus()) {
+
+            Optional<KeeperException.Code> failure = programmedFailure(Op.GET_CHILDREN, path);
+            if (failure.isPresent()) {
                 mutex.unlock();
-                cb.processResult(failReturnCode.intValue(), path, ctx, null, null);
+                cb.processResult(failure.get().intValue(), path, ctx, null, null);
                 return;
             } else if (stopped) {
                 mutex.unlock();
@@ -490,7 +513,7 @@ public class MockZooKeeper extends ZooKeeper {
     public Stat exists(String path, boolean watch) throws KeeperException, InterruptedException {
         mutex.lock();
         try {
-            checkProgrammedFail();
+            maybeThrowProgrammedFailure(Op.EXISTS, path);
 
             if (stopped)
                 throw new KeeperException.ConnectionLossException();
@@ -511,7 +534,7 @@ public class MockZooKeeper extends ZooKeeper {
     public Stat exists(String path, Watcher watcher) throws KeeperException, InterruptedException {
         mutex.lock();
         try {
-            checkProgrammedFail();
+            maybeThrowProgrammedFailure(Op.EXISTS, path);
 
             if (stopped)
                 throw new KeeperException.ConnectionLossException();
@@ -536,9 +559,10 @@ public class MockZooKeeper extends ZooKeeper {
     public void exists(String path, boolean watch, StatCallback cb, Object ctx) {
         executor.execute(() -> {
             mutex.lock();
-            if (getProgrammedFailStatus()) {
+            Optional<KeeperException.Code> failure = programmedFailure(Op.EXISTS, path);
+            if (failure.isPresent()) {
                 mutex.unlock();
-                cb.processResult(failReturnCode.intValue(), path, ctx, null);
+                cb.processResult(failure.get().intValue(), path, ctx, null);
                 return;
             } else if (stopped) {
                 mutex.unlock();
@@ -560,9 +584,10 @@ public class MockZooKeeper extends ZooKeeper {
     public void exists(String path, Watcher watcher, StatCallback cb, Object ctx) {
         executor.execute(() -> {
             mutex.lock();
-            if (getProgrammedFailStatus()) {
+            Optional<KeeperException.Code> failure = programmedFailure(Op.EXISTS, path);
+            if (failure.isPresent()) {
                 mutex.unlock();
-                cb.processResult(failReturnCode.intValue(), path, ctx, null);
+                cb.processResult(failure.get().intValue(), path, ctx, null);
                 return;
             } else if (stopped) {
                 mutex.unlock();
@@ -587,8 +612,9 @@ public class MockZooKeeper extends ZooKeeper {
     @Override
     public void sync(String path, VoidCallback cb, Object ctx) {
         executor.execute(() -> {
-            if (getProgrammedFailStatus()) {
-                cb.processResult(failReturnCode.intValue(), path, ctx);
+            Optional<KeeperException.Code> failure = programmedFailure(Op.SYNC, path);
+            if (failure.isPresent()) {
+                cb.processResult(failure.get().intValue(), path, ctx);
                 return;
             } else if (stopped) {
                 cb.processResult(KeeperException.Code.ConnectionLoss, path, ctx);
@@ -608,7 +634,7 @@ public class MockZooKeeper extends ZooKeeper {
         int newVersion;
 
         try {
-            checkProgrammedFail();
+            maybeThrowProgrammedFailure(Op.SET, path);
 
             if (stopped) {
                 throw new KeeperException.ConnectionLossException();
@@ -657,9 +683,10 @@ public class MockZooKeeper extends ZooKeeper {
 
             mutex.lock();
 
-            if (getProgrammedFailStatus()) {
+            Optional<KeeperException.Code> failure = programmedFailure(Op.SET, path);
+            if (failure.isPresent()) {
                 mutex.unlock();
-                cb.processResult(failReturnCode.intValue(), path, ctx, null);
+                cb.processResult(failure.get().intValue(), path, ctx, null);
                 return;
             } else if (stopped) {
                 mutex.unlock();
@@ -703,7 +730,7 @@ public class MockZooKeeper extends ZooKeeper {
 
     @Override
     public void delete(final String path, int version) throws InterruptedException, KeeperException {
-        checkProgrammedFail();
+        maybeThrowProgrammedFailure(Op.DELETE, path);
 
         final Set<Watcher> toNotifyDelete;
         final Set<Watcher> toNotifyParent;
@@ -777,9 +804,10 @@ public class MockZooKeeper extends ZooKeeper {
         executor.execute(() -> {
             mutex.lock();
 
-            if (getProgrammedFailStatus()) {
+            Optional<KeeperException.Code> failure = programmedFailure(Op.DELETE, path);
+            if (failure.isPresent()) {
                 mutex.unlock();
-                cb.processResult(failReturnCode.intValue(), path, ctx);
+                cb.processResult(failure.get().intValue(), path, ctx);
             } else if (stopped) {
                 mutex.unlock();
                 cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), path, ctx);
@@ -830,32 +858,37 @@ public class MockZooKeeper extends ZooKeeper {
         }
     }
 
-    void checkProgrammedFail() throws KeeperException {
-        if (stepsToFail.getAndDecrement() == 0 || this.alwaysFail) {
-            throw KeeperException.create(failReturnCode);
+    Optional<KeeperException.Code> programmedFailure(Op op, String path) {
+        KeeperException.Code error = this.alwaysFail.get();
+        if (error != KeeperException.Code.OK) {
+            return Optional.of(error);
+        }
+        Optional<Failure> failure = failures.stream().filter(f -> f.predicate.test(op, path)).findFirst();
+        if (failure.isPresent()) {
+            failures.remove(failure.get());
+            return Optional.of(failure.get().failReturnCode);
+        } else {
+            return Optional.empty();
         }
     }
 
-    boolean getProgrammedFailStatus() {
-        return stepsToFail.getAndDecrement() == 0;
+    void maybeThrowProgrammedFailure(Op op, String path) throws KeeperException {
+        Optional<KeeperException.Code> failure = programmedFailure(op, path);
+        if (failure.isPresent()) {
+            throw KeeperException.create(failure.get());
+        }
     }
 
-    public void failNow(KeeperException.Code rc) {
-        failAfter(0, rc);
+    public void failConditional(KeeperException.Code rc, BiPredicate<Op, String> predicate) {
+        failures.add(new Failure(rc, predicate));
     }
 
     public void setAlwaysFail(KeeperException.Code rc) {
-        this.alwaysFail = true;
-        this.failReturnCode = rc;
+        this.alwaysFail.set(rc);
     }
 
     public void unsetAlwaysFail() {
-        this.alwaysFail = false;
-    }
-
-    public void failAfter(int steps, KeeperException.Code rc) {
-        stepsToFail.set(steps);
-        failReturnCode = rc;
+        this.alwaysFail.set(KeeperException.Code.OK);
     }
 
     public void setSessionId(long id) {