You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@bookkeeper.apache.org by GitBox <gi...@apache.org> on 2018/02/17 07:51:29 UTC

[GitHub] sijie closed pull request #1130: handle zookeeper session expire in zk ledger manager

sijie closed pull request #1130: handle zookeeper session expire in zk ledger manager
URL: https://github.com/apache/bookkeeper/pull/1130
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java
index 753eebd27..1dbdf791d 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java
@@ -39,6 +39,7 @@
 import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeMap;
+import lombok.EqualsAndHashCode;
 import org.apache.bookkeeper.client.api.DigestType;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat;
@@ -52,6 +53,7 @@
  *
  * <p>It provides parsing and serialization methods of such metadata.
  */
+@EqualsAndHashCode
 public class LedgerMetadata implements org.apache.bookkeeper.client.api.LedgerMetadata {
     static final Logger LOG = LoggerFactory.getLogger(LedgerMetadata.class);
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
index 6f1872c05..b176ddfd7 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
@@ -17,6 +17,7 @@
  */
 package org.apache.bookkeeper.meta;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 
 import io.netty.util.concurrent.DefaultThreadFactory;
@@ -67,7 +68,8 @@
 
     private static final Logger LOG = LoggerFactory.getLogger(AbstractZkLedgerManager.class);
 
-    private static final int ZK_CONNECT_BACKOFF_MS = 200;
+    @VisibleForTesting
+    static final int ZK_CONNECT_BACKOFF_MS = 200;
 
     protected final AbstractConfiguration conf;
     protected final ZooKeeper zk;
@@ -131,6 +133,13 @@ public void run() {
                         LOG.debug("Removed ledger metadata listener set on ledger {} as its ledger is deleted : {}",
                                 ledgerId, listenerSet.size());
                     }
+                    // notify `null` as indicator that a ledger is deleted
+                    // make this behavior consistent with `NodeDeleted` watched event.
+                    synchronized (listenerSet) {
+                        for (LedgerMetadataListener listener : listenerSet) {
+                            listener.onChanged(ledgerId, null);
+                        }
+                    }
                 }
             } else {
                 LOG.warn("Failed on read ledger metadata of ledger {} : {}", ledgerId, rc);
@@ -179,8 +188,16 @@ protected AbstractZkLedgerManager(AbstractConfiguration conf, ZooKeeper zk) {
 
     @Override
     public void process(WatchedEvent event) {
-        LOG.info("Received watched event {} from zookeeper based ledger manager.", event);
+        LOG.debug("Received watched event {} from zookeeper based ledger manager.", event);
         if (Event.EventType.None == event.getType()) {
+            if (Event.KeeperState.Expired == event.getState()) {
+                LOG.info("ZooKeeper client expired on ledger manager.");
+                Set<Long> keySet = new HashSet<Long>(listeners.keySet());
+                for (Long lid : keySet) {
+                    scheduler.submit(new ReadLedgerMetadataTask(lid));
+                    LOG.info("Re-read ledger metadata for {} after zookeeper session expired.", lid);
+                }
+            }
             return;
         }
         String path = event.getPath();
@@ -409,7 +426,7 @@ public void processResult(int rc, String path, Object ctx, byte[] data, Stat sta
     public void writeLedgerMetadata(final long ledgerId, final LedgerMetadata metadata,
                                     final GenericCallback<Void> cb) {
         Version v = metadata.getVersion();
-        if (Version.NEW == v || !(v instanceof LongVersion)) {
+        if (!(v instanceof LongVersion)) {
             cb.operationComplete(BKException.Code.MetadataVersionException, null);
             return;
         }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/versioning/LongVersion.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/versioning/LongVersion.java
index 4ec438826..862977a8c 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/versioning/LongVersion.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/versioning/LongVersion.java
@@ -17,9 +17,12 @@
  */
 package org.apache.bookkeeper.versioning;
 
+import lombok.EqualsAndHashCode;
+
 /**
  * A version object holds integer version.
  */
+@EqualsAndHashCode
 public class LongVersion implements Version {
     protected long version;
 
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/AbstractZkLedgerManagerTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/AbstractZkLedgerManagerTest.java
new file mode 100644
index 000000000..b7c8ef5e5
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/AbstractZkLedgerManagerTest.java
@@ -0,0 +1,883 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.meta;
+
+import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
+import static org.apache.bookkeeper.meta.AbstractZkLedgerManager.ZK_CONNECT_BACKOFF_MS;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.CALLS_REAL_METHODS;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.withSettings;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BKException.Code;
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.client.LedgerMetadata;
+import org.apache.bookkeeper.common.testing.executors.MockExecutorController;
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.LedgerMetadataListener;
+import org.apache.bookkeeper.test.TestCallbacks.GenericCallbackFuture;
+import org.apache.bookkeeper.util.ZkUtils;
+import org.apache.bookkeeper.versioning.LongVersion;
+import org.apache.bookkeeper.versioning.Version;
+import org.apache.bookkeeper.zookeeper.MockZooKeeperTestCase;
+import org.apache.zookeeper.AsyncCallback.DataCallback;
+import org.apache.zookeeper.AsyncCallback.StatCallback;
+import org.apache.zookeeper.AsyncCallback.VoidCallback;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.apache.zookeeper.data.Stat;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+/**
+ * Unit test of {@link AbstractZkLedgerManager}.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({ AbstractZkLedgerManager.class, ZkUtils.class })
+public class AbstractZkLedgerManagerTest extends MockZooKeeperTestCase {
+
+    private ClientConfiguration conf;
+    private AbstractZkLedgerManager ledgerManager;
+    private ScheduledExecutorService scheduler;
+    private MockExecutorController schedulerController;
+    private LedgerMetadata metadata;
+
+    @Before
+    public void setup() throws Exception {
+        PowerMockito.mockStatic(Executors.class);
+
+        super.setup();
+
+        this.scheduler = PowerMockito.mock(ScheduledExecutorService.class);
+        this.schedulerController = new MockExecutorController()
+            .controlSubmit(scheduler)
+            .controlSchedule(scheduler)
+            .controlExecute(scheduler)
+            .controlScheduleAtFixedRate(scheduler, 10);
+        PowerMockito.when(Executors.newSingleThreadScheduledExecutor(any()))
+            .thenReturn(scheduler);
+
+        this.conf = new ClientConfiguration();
+        this.ledgerManager = mock(
+            AbstractZkLedgerManager.class,
+            withSettings()
+                .useConstructor(conf, mockZk)
+                .defaultAnswer(CALLS_REAL_METHODS));
+        this.metadata = new LedgerMetadata(
+            5, 3, 3,
+            DigestType.CRC32,
+            new byte[0],
+            Collections.emptyMap(),
+            false);
+
+        doAnswer(invocationOnMock -> {
+            long ledgerId = invocationOnMock.getArgument(0);
+            return String.valueOf(ledgerId);
+        }).when(ledgerManager).getLedgerPath(anyLong());
+        doAnswer(invocationOnMock -> {
+            String ledgerStr = invocationOnMock.getArgument(0);
+            return Long.parseLong(ledgerStr);
+        }).when(ledgerManager).getLedgerId(anyString());
+
+        // verify constructor
+        assertEquals(conf.getZkLedgersRootPath(), ledgerManager.ledgerRootPath);
+        assertSame(mockZk, ledgerManager.zk);
+        assertSame(conf, ledgerManager.conf);
+        assertSame(scheduler, ledgerManager.scheduler);
+    }
+
+    @After
+    public void teardown() throws Exception {
+        if (null != ledgerManager) {
+            ledgerManager.close();
+
+            // zookeeper is passed in, it should not be closed.
+            verify(mockZk, times(0)).close();
+            verify(scheduler, times(1)).shutdown();
+        }
+    }
+
+    @Test
+    public void testCreateLedgerMetadataSuccess() throws Exception {
+        long ledgerId = System.currentTimeMillis();
+        String ledgerStr = String.valueOf(ledgerId);
+        mockZkUtilsAsyncCreateFullPathOptimistic(
+            ledgerStr, CreateMode.PERSISTENT,
+            KeeperException.Code.OK.intValue(), ledgerStr
+        );
+
+        assertEquals(Version.NEW, metadata.getVersion());
+
+        GenericCallbackFuture<Void> callbackFuture = new GenericCallbackFuture<>();
+        ledgerManager.createLedgerMetadata(ledgerId, metadata, callbackFuture);
+        callbackFuture.get();
+
+        assertEquals(new LongVersion(0), metadata.getVersion());
+    }
+
+    @Test
+    public void testCreateLedgerMetadataNodeExists() throws Exception {
+        long ledgerId = System.currentTimeMillis();
+        String ledgerStr = String.valueOf(ledgerId);
+        mockZkUtilsAsyncCreateFullPathOptimistic(
+            ledgerStr, CreateMode.PERSISTENT,
+            KeeperException.Code.NODEEXISTS.intValue(), null);
+
+        assertEquals(Version.NEW, metadata.getVersion());
+
+        GenericCallbackFuture<Void> callbackFuture = new GenericCallbackFuture<>();
+        ledgerManager.createLedgerMetadata(ledgerId, metadata, callbackFuture);
+        try {
+            result(callbackFuture);
+            fail("Should fail to create ledger metadata if the ledger already exists");
+        } catch (Exception e) {
+            assertTrue(e instanceof BKException);
+            BKException bke = (BKException) e;
+            assertEquals(Code.LedgerExistException, bke.getCode());
+        }
+
+        // creation failed, so metadata should not be modified
+        assertEquals(Version.NEW, metadata.getVersion());
+    }
+
+    @Test
+    public void testCreateLedgerMetadataException() throws Exception {
+        long ledgerId = System.currentTimeMillis();
+        String ledgerStr = String.valueOf(ledgerId);
+        mockZkUtilsAsyncCreateFullPathOptimistic(
+            ledgerStr, CreateMode.PERSISTENT,
+            KeeperException.Code.CONNECTIONLOSS.intValue(), null);
+
+        assertEquals(Version.NEW, metadata.getVersion());
+
+        GenericCallbackFuture<Void> callbackFuture = new GenericCallbackFuture<>();
+        ledgerManager.createLedgerMetadata(ledgerId, metadata, callbackFuture);
+        try {
+            result(callbackFuture);
+            fail("Should fail to create ledger metadata when encountering zookeeper exception");
+        } catch (Exception e) {
+            assertTrue(e instanceof BKException);
+            BKException bke = (BKException) e;
+            assertEquals(Code.ZKException, bke.getCode());
+        }
+
+        // creation failed, so metadata should not be modified
+        assertEquals(Version.NEW, metadata.getVersion());
+    }
+
+    @Test
+    public void testRemoveLedgerMetadataSuccess() throws Exception {
+        long ledgerId = System.currentTimeMillis();
+        String ledgerStr = String.valueOf(ledgerId);
+        LongVersion version = new LongVersion(1234L);
+
+        mockZkDelete(
+            ledgerStr, (int) version.getLongVersion(),
+            KeeperException.Code.OK.intValue());
+
+        GenericCallbackFuture<Void> callbackFuture = new GenericCallbackFuture<>();
+        ledgerManager.removeLedgerMetadata(ledgerId, version, callbackFuture);
+        result(callbackFuture);
+
+        verify(mockZk, times(1))
+            .delete(eq(ledgerStr), eq(1234), any(VoidCallback.class), eq(null));
+    }
+
+    @Test
+    public void testRemoveLedgerMetadataVersionAny() throws Exception {
+        long ledgerId = System.currentTimeMillis();
+        String ledgerStr = String.valueOf(ledgerId);
+
+        mockZkDelete(
+            ledgerStr, -1,
+            KeeperException.Code.OK.intValue());
+
+        GenericCallbackFuture<Void> callbackFuture = new GenericCallbackFuture<>();
+        ledgerManager.removeLedgerMetadata(ledgerId, Version.ANY, callbackFuture);
+        result(callbackFuture);
+
+        verify(mockZk, times(1))
+            .delete(eq(ledgerStr), eq(-1), any(VoidCallback.class), eq(null));
+    }
+
+    @Test
+    public void testRemoveLedgerMetadataVersionNew() throws Exception {
+        testRemoveLedgerMetadataInvalidVersion(Version.NEW);
+    }
+
+    @Test
+    public void testRemoveLedgerMetadataUnknownVersionType() throws Exception {
+        Version version = mock(Version.class);
+        testRemoveLedgerMetadataInvalidVersion(version);
+    }
+
+    private void testRemoveLedgerMetadataInvalidVersion(Version version) throws Exception {
+        long ledgerId = System.currentTimeMillis();
+
+        GenericCallbackFuture<Void> callbackFuture = new GenericCallbackFuture<>();
+        ledgerManager.removeLedgerMetadata(ledgerId, version, callbackFuture);
+        try {
+            result(callbackFuture);
+            fail("Should fail to remove metadata if version is " + Version.NEW);
+        } catch (BKException bke) {
+            assertEquals(Code.MetadataVersionException, bke.getCode());
+        }
+    }
+
+    @Test
+    public void testRemoveLedgerMetadataNoNode() throws Exception {
+        long ledgerId = System.currentTimeMillis();
+        String ledgerStr = String.valueOf(ledgerId);
+        LongVersion version = new LongVersion(1234L);
+
+        mockZkDelete(
+            ledgerStr, (int) version.getLongVersion(),
+            KeeperException.Code.NONODE.intValue());
+
+        GenericCallbackFuture<Void> callbackFuture = new GenericCallbackFuture<>();
+        ledgerManager.removeLedgerMetadata(ledgerId, version, callbackFuture);
+        try {
+            result(callbackFuture);
+            fail("Should fail to remove metadata if no such ledger exists");
+        } catch (BKException bke) {
+            assertEquals(Code.NoSuchLedgerExistsException, bke.getCode());
+        }
+
+        verify(mockZk, times(1))
+            .delete(eq(ledgerStr), eq(1234), any(VoidCallback.class), eq(null));
+    }
+
+    @Test
+    public void testRemoveLedgerMetadataException() throws Exception {
+        long ledgerId = System.currentTimeMillis();
+        String ledgerStr = String.valueOf(ledgerId);
+        LongVersion version = new LongVersion(1234L);
+
+        mockZkDelete(
+            ledgerStr, (int) version.getLongVersion(),
+            KeeperException.Code.CONNECTIONLOSS.intValue());
+
+        GenericCallbackFuture<Void> callbackFuture = new GenericCallbackFuture<>();
+        ledgerManager.removeLedgerMetadata(ledgerId, version, callbackFuture);
+        try {
+            result(callbackFuture);
+            fail("Should fail to remove metadata if no such ledger exists");
+        } catch (BKException bke) {
+            assertEquals(Code.ZKException, bke.getCode());
+        }
+
+        verify(mockZk, times(1))
+            .delete(eq(ledgerStr), eq(1234), any(VoidCallback.class), eq(null));
+    }
+
+    @Test
+    public void testRemoveLedgerMetadataHierarchical() throws Exception {
+        HierarchicalLedgerManager hlm = new HierarchicalLedgerManager(conf, mockZk);
+        testRemoveLedgerMetadataHierarchicalLedgerManager(hlm);
+    }
+
+    @Test
+    public void testRemoveLedgerMetadataLongHierarchical() throws Exception {
+        LongHierarchicalLedgerManager hlm = new LongHierarchicalLedgerManager(conf, mockZk);
+        testRemoveLedgerMetadataHierarchicalLedgerManager(hlm);
+    }
+
+    private void testRemoveLedgerMetadataHierarchicalLedgerManager(AbstractZkLedgerManager lm) throws Exception {
+        long ledgerId = System.currentTimeMillis();
+        String ledgerStr = lm.getLedgerPath(ledgerId);
+        LongVersion version = new LongVersion(1234L);
+
+        mockZkUtilsAsyncDeleteFullPathOptimistic(
+            ledgerStr, (int) version.getLongVersion(),
+            KeeperException.Code.OK.intValue());
+
+        GenericCallbackFuture<Void> callbackFuture = new GenericCallbackFuture<>();
+        lm.removeLedgerMetadata(ledgerId, version, callbackFuture);
+        result(callbackFuture);
+
+        PowerMockito.verifyStatic(
+            ZkUtils.class, times(1));
+        ZkUtils.asyncDeleteFullPathOptimistic(
+            eq(mockZk), eq(ledgerStr), eq(1234), any(VoidCallback.class), eq(ledgerStr));
+
+    }
+
+    @Test
+    public void testReadLedgerMetadataSuccess() throws Exception {
+        long ledgerId = System.currentTimeMillis();
+        String ledgerStr = String.valueOf(ledgerId);
+
+        metadata.setVersion(new LongVersion(1234L));
+        Stat stat = mock(Stat.class);
+        when(stat.getVersion()).thenReturn(1234);
+        when(stat.getCtime()).thenReturn(metadata.getCtime());
+        mockZkGetData(
+            ledgerStr, false,
+            KeeperException.Code.OK.intValue(), metadata.serialize(), stat);
+
+        GenericCallbackFuture<LedgerMetadata> callbackFuture = new GenericCallbackFuture<>();
+        ledgerManager.readLedgerMetadata(ledgerId, callbackFuture);
+        LedgerMetadata readMetadata = result(callbackFuture);
+        assertEquals(metadata, readMetadata);
+
+        verify(mockZk, times(1))
+            .getData(eq(ledgerStr), eq(null), any(DataCallback.class), any());
+    }
+
+    @Test
+    public void testReadLedgerMetadataNoNode() throws Exception {
+        long ledgerId = System.currentTimeMillis();
+        String ledgerStr = String.valueOf(ledgerId);
+
+        mockZkGetData(
+            ledgerStr, false,
+            KeeperException.Code.NONODE.intValue(), null, null);
+
+        GenericCallbackFuture<LedgerMetadata> callbackFuture = new GenericCallbackFuture<>();
+        ledgerManager.readLedgerMetadata(ledgerId, callbackFuture);
+        try {
+            result(callbackFuture);
+            fail("Should fail on reading ledger metadata if a ledger doesn't exist");
+        } catch (BKException bke) {
+            assertEquals(Code.NoSuchLedgerExistsException, bke.getCode());
+        }
+
+        verify(mockZk, times(1))
+            .getData(eq(ledgerStr), eq(null), any(DataCallback.class), any());
+    }
+
+    @Test
+    public void testReadLedgerMetadataException() throws Exception {
+        long ledgerId = System.currentTimeMillis();
+        String ledgerStr = String.valueOf(ledgerId);
+
+        mockZkGetData(
+            ledgerStr, false,
+            KeeperException.Code.CONNECTIONLOSS.intValue(), null, null);
+
+        GenericCallbackFuture<LedgerMetadata> callbackFuture = new GenericCallbackFuture<>();
+        ledgerManager.readLedgerMetadata(ledgerId, callbackFuture);
+        try {
+            result(callbackFuture);
+            fail("Should fail on reading ledger metadata if a ledger doesn't exist");
+        } catch (BKException bke) {
+            assertEquals(Code.ZKException, bke.getCode());
+        }
+
+        verify(mockZk, times(1))
+            .getData(eq(ledgerStr), eq(null), any(DataCallback.class), any());
+    }
+
+    @Test
+    public void testReadLedgerMetadataStatMissing() throws Exception {
+        long ledgerId = System.currentTimeMillis();
+        String ledgerStr = String.valueOf(ledgerId);
+
+        mockZkGetData(
+            ledgerStr, false,
+            KeeperException.Code.OK.intValue(), metadata.serialize(), null);
+
+        GenericCallbackFuture<LedgerMetadata> callbackFuture = new GenericCallbackFuture<>();
+        ledgerManager.readLedgerMetadata(ledgerId, callbackFuture);
+        try {
+            result(callbackFuture);
+            fail("Should fail on reading ledger metadata if a ledger doesn't exist");
+        } catch (BKException bke) {
+            assertEquals(Code.ZKException, bke.getCode());
+        }
+
+        verify(mockZk, times(1))
+            .getData(eq(ledgerStr), eq(null), any(DataCallback.class), any());
+    }
+
+    @Test
+    public void testReadLedgerMetadataDataCorrupted() throws Exception {
+        long ledgerId = System.currentTimeMillis();
+        String ledgerStr = String.valueOf(ledgerId);
+
+        metadata.setVersion(new LongVersion(1234L));
+        Stat stat = mock(Stat.class);
+        when(stat.getVersion()).thenReturn(1234);
+        when(stat.getCtime()).thenReturn(metadata.getCtime());
+        mockZkGetData(
+            ledgerStr, false,
+            KeeperException.Code.OK.intValue(), new byte[0], stat);
+
+        GenericCallbackFuture<LedgerMetadata> callbackFuture = new GenericCallbackFuture<>();
+        ledgerManager.readLedgerMetadata(ledgerId, callbackFuture);
+        try {
+            result(callbackFuture);
+            fail("Should fail on reading ledger metadata if a ledger doesn't exist");
+        } catch (BKException bke) {
+            assertEquals(Code.ZKException, bke.getCode());
+        }
+
+        verify(mockZk, times(1))
+            .getData(eq(ledgerStr), eq(null), any(DataCallback.class), any());
+    }
+
+    @Test
+    public void testWriteLedgerMetadataSuccess() throws Exception {
+        long ledgerId = System.currentTimeMillis();
+        String ledgerStr = String.valueOf(ledgerId);
+
+        metadata.setVersion(new LongVersion(1234L));
+        Stat stat = mock(Stat.class);
+        when(stat.getVersion()).thenReturn(1235);
+        when(stat.getCtime()).thenReturn(metadata.getCtime());
+        mockZkSetData(
+            ledgerStr, metadata.serialize(), 1234,
+            KeeperException.Code.OK.intValue(), stat);
+
+        assertEquals(new LongVersion(1234L), metadata.getVersion());
+
+        GenericCallbackFuture<Void> callbackFuture = new GenericCallbackFuture<>();
+        ledgerManager.writeLedgerMetadata(ledgerId, metadata, callbackFuture);
+        result(callbackFuture);
+
+        assertEquals(new LongVersion(1235L), metadata.getVersion());
+
+        verify(mockZk, times(1))
+            .setData(eq(ledgerStr), any(byte[].class), eq(1234), any(StatCallback.class), any());
+    }
+
+    @Test
+    public void testWriteLedgerMetadataBadVersion() throws Exception {
+        long ledgerId = System.currentTimeMillis();
+        String ledgerStr = String.valueOf(ledgerId);
+
+        metadata.setVersion(new LongVersion(1234L));
+        mockZkSetData(
+            ledgerStr, metadata.serialize(), 1234,
+            KeeperException.Code.BADVERSION.intValue(), null);
+
+        assertEquals(new LongVersion(1234L), metadata.getVersion());
+
+        GenericCallbackFuture<Void> callbackFuture = new GenericCallbackFuture<>();
+        ledgerManager.writeLedgerMetadata(ledgerId, metadata, callbackFuture);
+        try {
+            result(callbackFuture);
+            fail("Should fail on writing ledger metadata if encountering bad version");
+        } catch (BKException bke) {
+            assertEquals(Code.MetadataVersionException, bke.getCode());
+        }
+
+        // version remain unchanged
+        assertEquals(new LongVersion(1234L), metadata.getVersion());
+
+        verify(mockZk, times(1))
+            .setData(eq(ledgerStr), any(byte[].class), eq(1234), any(StatCallback.class), any());
+    }
+
+    @Test
+    public void testWriteLedgerMetadataException() throws Exception {
+        long ledgerId = System.currentTimeMillis();
+        String ledgerStr = String.valueOf(ledgerId);
+
+        metadata.setVersion(new LongVersion(1234L));
+        mockZkSetData(
+            ledgerStr, metadata.serialize(), 1234,
+            KeeperException.Code.CONNECTIONLOSS.intValue(), null);
+
+        assertEquals(new LongVersion(1234L), metadata.getVersion());
+
+        GenericCallbackFuture<Void> callbackFuture = new GenericCallbackFuture<>();
+        ledgerManager.writeLedgerMetadata(ledgerId, metadata, callbackFuture);
+        try {
+            result(callbackFuture);
+            fail("Should fail on writing ledger metadata if encountering zookeeper exceptions");
+        } catch (BKException bke) {
+            assertEquals(Code.ZKException, bke.getCode());
+        }
+
+        // version remain unchanged
+        assertEquals(new LongVersion(1234L), metadata.getVersion());
+
+        verify(mockZk, times(1))
+            .setData(eq(ledgerStr), any(byte[].class), eq(1234), any(StatCallback.class), any());
+    }
+
+    @Test
+    public void testWriteLedgerMetadataInvalidVersion() throws Exception {
+        Version[] versions = new Version[] {
+            Version.NEW,
+            Version.ANY,
+            mock(Version.class)
+        };
+        for (Version version : versions) {
+            testWriteLedgerMetadataInvalidVersion(version);
+        }
+    }
+
+    private void testWriteLedgerMetadataInvalidVersion(Version invalidVersion) throws Exception {
+        long ledgerId = System.currentTimeMillis();
+
+        metadata.setVersion(invalidVersion);
+
+        GenericCallbackFuture<Void> callbackFuture = new GenericCallbackFuture<>();
+        ledgerManager.writeLedgerMetadata(ledgerId, metadata, callbackFuture);
+        try {
+            result(callbackFuture);
+            fail("Should fail on writing ledger metadata if an invalid version is provided.");
+        } catch (BKException bke) {
+            assertEquals(Code.MetadataVersionException, bke.getCode());
+        }
+
+        verify(mockZk, times(0))
+            .setData(anyString(), any(byte[].class), anyInt(), any(StatCallback.class), any());
+    }
+
+    @Test
+    public void testLedgerMetadataListener() throws Exception {
+        long ledgerId = System.currentTimeMillis();
+        String ledgerStr = String.valueOf(ledgerId);
+
+        LinkedBlockingQueue<LedgerMetadata> changes = new LinkedBlockingQueue<>();
+        LedgerMetadataListener listener = (ledgerId1, metadata) -> changes.add(metadata);
+
+        metadata.setVersion(new LongVersion(1234L));
+        Stat stat = mock(Stat.class);
+        when(stat.getVersion()).thenReturn(1234);
+        when(stat.getCtime()).thenReturn(metadata.getCtime());
+        mockZkGetData(
+            ledgerStr, true,
+            KeeperException.Code.OK.intValue(), metadata.serialize(), stat);
+
+        ledgerManager.registerLedgerMetadataListener(ledgerId, listener);
+
+        // the listener will be notified with first get
+        LedgerMetadata change1 = changes.take();
+        assertEquals(metadata, change1);
+        verify(mockZk, times(1))
+            .getData(anyString(), any(Watcher.class), any(DataCallback.class), any());
+
+        // the watcher is registered for receiving watched event
+        assertTrue(watchers.containsKey(ledgerStr));
+        Set<Watcher> watcherSet1 = watchers.get(ledgerStr);
+        assertEquals(1, watcherSet1.size());
+        Watcher registeredWatcher1 = watcherSet1.stream().findFirst().get();
+
+        // mock get data to return an updated metadata
+        metadata.setVersion(new LongVersion(1235L));
+        when(stat.getVersion()).thenReturn(1235);
+        mockZkGetData(
+            ledgerStr, true,
+            KeeperException.Code.OK.intValue(), metadata.serialize(), stat);
+
+        // notify the watcher event
+        notifyWatchedEvent(
+            EventType.NodeDataChanged, KeeperState.SyncConnected, ledgerStr);
+
+        // the listener should receive an updated metadata
+        LedgerMetadata change2 = changes.take();
+        assertEquals(metadata, change2);
+        verify(mockZk, times(2))
+            .getData(anyString(), any(Watcher.class), any(DataCallback.class), any());
+
+        // after the listener receive an updated metadata, a new watcher should be registered
+        // for subsequent changes again.
+        assertTrue(watchers.containsKey(ledgerStr));
+        Set<Watcher> watcherSet2 = watchers.get(ledgerStr);
+        assertEquals(1, watcherSet2.size());
+        Watcher registeredWatcher2 = watcherSet2.stream().findFirst().get();
+
+        // zookeeper watchers are same, since there is only one giant watcher per ledger manager.
+        assertSame(registeredWatcher1, registeredWatcher2);
+
+        // verify scheduler
+        verify(scheduler, times(2)).submit(any(Runnable.class));
+        verify(scheduler, times(0))
+            .schedule(any(Runnable.class), anyLong(), any(TimeUnit.class));
+    }
+
+    @Test
+    public void testLedgerMetadataListenerOnLedgerDeleted() throws Exception {
+        long ledgerId = System.currentTimeMillis();
+        String ledgerStr = String.valueOf(ledgerId);
+
+        LinkedBlockingQueue<Optional<LedgerMetadata>> changes = new LinkedBlockingQueue<>();
+        LedgerMetadataListener listener = (ledgerId1, metadata) -> changes.add(Optional.ofNullable(metadata));
+
+        metadata.setVersion(new LongVersion(1234L));
+        Stat stat = mock(Stat.class);
+        when(stat.getVersion()).thenReturn(1234);
+        when(stat.getCtime()).thenReturn(metadata.getCtime());
+        mockZkGetData(
+            ledgerStr, true,
+            KeeperException.Code.OK.intValue(), metadata.serialize(), stat);
+
+        ledgerManager.registerLedgerMetadataListener(ledgerId, listener);
+        assertTrue(ledgerManager.listeners.containsKey(ledgerId));
+
+        // the listener will be notified with first get
+        LedgerMetadata change1 = changes.take().get();
+        assertEquals(metadata, change1);
+        verify(mockZk, times(1))
+            .getData(anyString(), any(Watcher.class), any(DataCallback.class), any());
+
+        // the watcher is registered for receiving watched event
+        assertTrue(watchers.containsKey(ledgerStr));
+
+        // mock get data to simulate an ledger is deleted
+        mockZkGetData(
+            ledgerStr, true,
+            KeeperException.Code.NONODE.intValue(), null, null);
+
+        // notify the watcher event
+        notifyWatchedEvent(
+            EventType.NodeDataChanged, KeeperState.SyncConnected, ledgerStr);
+
+        // the listener should be removed from listener set and not receive an updated metadata anymore
+        Optional<LedgerMetadata> change2 = changes.take();
+        assertFalse(change2.isPresent());
+        assertFalse(ledgerManager.listeners.containsKey(ledgerId));
+
+        // verify scheduler: the listener is only triggered once
+        verify(scheduler, times(1)).submit(any(Runnable.class));
+        verify(scheduler, times(0)).schedule(
+            any(Runnable.class), anyLong(), any(TimeUnit.class));
+
+        // no watcher is registered
+        assertFalse(watchers.containsKey(ledgerStr));
+    }
+
+    @Test
+    public void testLedgerMetadataListenerOnLedgerDeletedEvent() throws Exception {
+        long ledgerId = System.currentTimeMillis();
+        String ledgerStr = String.valueOf(ledgerId);
+
+        LinkedBlockingQueue<Optional<LedgerMetadata>> changes = new LinkedBlockingQueue<>();
+        LedgerMetadataListener listener = (ledgerId1, metadata) -> changes.add(Optional.ofNullable(metadata));
+
+        metadata.setVersion(new LongVersion(1234L));
+        Stat stat = mock(Stat.class);
+        when(stat.getVersion()).thenReturn(1234);
+        when(stat.getCtime()).thenReturn(metadata.getCtime());
+        mockZkGetData(
+            ledgerStr, true,
+            KeeperException.Code.OK.intValue(), metadata.serialize(), stat);
+
+        ledgerManager.registerLedgerMetadataListener(ledgerId, listener);
+        assertTrue(ledgerManager.listeners.containsKey(ledgerId));
+
+        // the listener will be notified with first get
+        LedgerMetadata change1 = changes.take().get();
+        assertEquals(metadata, change1);
+        verify(mockZk, times(1))
+            .getData(anyString(), any(Watcher.class), any(DataCallback.class), any());
+
+        // the watcher is registered for receiving watched event
+        assertTrue(watchers.containsKey(ledgerStr));
+
+        // notify the watcher event
+        notifyWatchedEvent(
+            EventType.NodeDeleted, KeeperState.SyncConnected, ledgerStr);
+
+        // the listener should be removed from listener set and a null change is notified.
+        Optional<LedgerMetadata> change2 = changes.take();
+        assertFalse(change2.isPresent());
+        // no more `getData` is called.
+        verify(mockZk, times(1))
+            .getData(anyString(), any(Watcher.class), any(DataCallback.class), any());
+        // listener is automatically unregistered after a ledger is deleted.
+        assertFalse(ledgerManager.listeners.containsKey(ledgerId));
+    }
+
+    @Test
+    public void testLedgerMetadataListenerOnRetries() throws Exception {
+        long ledgerId = System.currentTimeMillis();
+        String ledgerStr = String.valueOf(ledgerId);
+
+        LinkedBlockingQueue<LedgerMetadata> changes = new LinkedBlockingQueue<>();
+        LedgerMetadataListener listener = (ledgerId1, metadata) -> changes.add(metadata);
+
+        metadata.setVersion(new LongVersion(1234L));
+        Stat stat = mock(Stat.class);
+        when(stat.getVersion()).thenReturn(1234);
+        when(stat.getCtime()).thenReturn(metadata.getCtime());
+
+        // fail the first get, so the ledger manager will retry get data again.
+        mockZkGetData(
+            ledgerStr, true,
+            KeeperException.Code.SESSIONEXPIRED.intValue(), null, null);
+
+        ledgerManager.registerLedgerMetadataListener(ledgerId, listener);
+        assertTrue(ledgerManager.listeners.containsKey(ledgerId));
+
+        // the listener will not be notified with any updates
+        assertNull(changes.poll());
+        // an retry task is scheduled
+        verify(scheduler, times(1))
+            .schedule(any(Runnable.class), anyLong(), any(TimeUnit.class));
+        // zookeeper is called once
+        verify(mockZk, times(1))
+            .getData(anyString(), any(Watcher.class), any(DataCallback.class), any());
+        // watcher is not registered since getData call is failed
+        assertFalse(watchers.containsKey(ledgerStr));
+
+        // mock get data to return a valid response
+        mockZkGetData(
+            ledgerStr, true,
+            KeeperException.Code.OK.intValue(), metadata.serialize(), stat);
+
+        schedulerController.advance(Duration.ofMillis(ZK_CONNECT_BACKOFF_MS));
+
+        // the listener will be notified with first get
+        LedgerMetadata change = changes.take();
+        assertEquals(metadata, change);
+        verify(mockZk, times(2))
+            .getData(anyString(), any(Watcher.class), any(DataCallback.class), any());
+
+        // watcher is registered after successfully `getData`
+        assertTrue(watchers.containsKey(ledgerStr));
+    }
+
+    @Test
+    public void testLedgerMetadataListenerOnSessionExpired() throws Exception {
+        long ledgerId = System.currentTimeMillis();
+        String ledgerStr = String.valueOf(ledgerId);
+
+        LinkedBlockingQueue<LedgerMetadata> changes = new LinkedBlockingQueue<>();
+        LedgerMetadataListener listener = (ledgerId1, metadata) -> changes.add(metadata);
+
+        metadata.setVersion(new LongVersion(1234L));
+        Stat stat = mock(Stat.class);
+        when(stat.getVersion()).thenReturn(1234);
+        when(stat.getCtime()).thenReturn(metadata.getCtime());
+        mockZkGetData(
+            ledgerStr, true,
+            KeeperException.Code.OK.intValue(), metadata.serialize(), stat);
+
+        ledgerManager.registerLedgerMetadataListener(ledgerId, listener);
+
+        // the listener will be notified with first get
+        LedgerMetadata change1 = changes.take();
+        assertEquals(metadata, change1);
+        verify(mockZk, times(1))
+            .getData(anyString(), any(Watcher.class), any(DataCallback.class), any());
+
+        // the watcher is registered for receiving watched event
+        assertTrue(watchers.containsKey(ledgerStr));
+        Set<Watcher> watcherSet1 = watchers.get(ledgerStr);
+        assertEquals(1, watcherSet1.size());
+        Watcher registeredWatcher1 = watcherSet1.stream().findFirst().get();
+
+        // simulate session expired
+        notifyWatchedEvent(
+            EventType.None, KeeperState.Expired, ledgerStr);
+
+        // ledger manager will retry to read metadata again
+        LedgerMetadata change2 = changes.take();
+        assertEquals(metadata, change2);
+        verify(mockZk, times(2))
+            .getData(anyString(), any(Watcher.class), any(DataCallback.class), any());
+
+        // the watcher is registered for receiving watched event
+        assertTrue(watchers.containsKey(ledgerStr));
+        Set<Watcher> watcherSet2 = watchers.get(ledgerStr);
+        assertEquals(1, watcherSet2.size());
+        Watcher registeredWatcher2 = watcherSet2.stream().findFirst().get();
+
+        assertSame(registeredWatcher1, registeredWatcher2);
+    }
+
+    @Test
+    public void testUnregisterLedgerMetadataListener() throws Exception {
+        long ledgerId = System.currentTimeMillis();
+        String ledgerStr = String.valueOf(ledgerId);
+
+        LinkedBlockingQueue<LedgerMetadata> changes = new LinkedBlockingQueue<>();
+        LedgerMetadataListener listener = (ledgerId1, metadata) -> changes.add(metadata);
+
+        metadata.setVersion(new LongVersion(1234L));
+        Stat stat = mock(Stat.class);
+        when(stat.getVersion()).thenReturn(1234);
+        when(stat.getCtime()).thenReturn(metadata.getCtime());
+        mockZkGetData(
+            ledgerStr, true,
+            KeeperException.Code.OK.intValue(), metadata.serialize(), stat);
+
+        ledgerManager.registerLedgerMetadataListener(ledgerId, listener);
+        assertTrue(ledgerManager.listeners.containsKey(ledgerId));
+
+        // the listener will be notified with first get
+        LedgerMetadata change1 = changes.take();
+        assertEquals(metadata, change1);
+        verify(mockZk, times(1))
+            .getData(anyString(), any(Watcher.class), any(DataCallback.class), any());
+
+        // the watcher is registered for receiving watched event
+        assertTrue(watchers.containsKey(ledgerStr));
+        Set<Watcher> watcherSet1 = watchers.get(ledgerStr);
+        assertEquals(1, watcherSet1.size());
+        Watcher registeredWatcher1 = watcherSet1.stream().findFirst().get();
+
+        // mock get data to return an updated metadata
+        metadata.setVersion(new LongVersion(1235L));
+        when(stat.getVersion()).thenReturn(1235);
+        mockZkGetData(
+            ledgerStr, true,
+            KeeperException.Code.OK.intValue(), metadata.serialize(), stat);
+
+        // unregister the listener
+        ledgerManager.unregisterLedgerMetadataListener(ledgerId, listener);
+        assertFalse(ledgerManager.listeners.containsKey(ledgerId));
+
+        // notify the watcher event
+        notifyWatchedEvent(
+            EventType.NodeDataChanged, KeeperState.SyncConnected, ledgerStr);
+
+        // since listener is already unregistered so no more `getData` is issued.
+        assertNull(changes.poll());
+        verify(mockZk, times(1))
+            .getData(anyString(), any(Watcher.class), any(DataCallback.class), any());
+    }
+
+}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/TestCallbacks.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/TestCallbacks.java
index 328487d56..23b83bd4e 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/TestCallbacks.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/TestCallbacks.java
@@ -22,6 +22,7 @@
 
 import com.google.common.util.concurrent.AbstractFuture;
 
+import java.util.concurrent.CompletableFuture;
 import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.LedgerHandle;
@@ -40,13 +41,13 @@
      * Generic callback future.
      */
     public static class GenericCallbackFuture<T>
-        extends AbstractFuture<T> implements GenericCallback<T> {
+        extends CompletableFuture<T> implements GenericCallback<T> {
         @Override
         public void operationComplete(int rc, T value) {
             if (rc != BKException.Code.OK) {
-                setException(BKException.create(rc));
+                completeExceptionally(BKException.create(rc));
             } else {
-                set(value);
+                complete(value);
             }
         }
     }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/zookeeper/MockZooKeeperTestCase.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/zookeeper/MockZooKeeperTestCase.java
new file mode 100644
index 000000000..eb282296c
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/zookeeper/MockZooKeeperTestCase.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.zookeeper;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyList;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+
+import com.google.common.collect.Maps;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.bookkeeper.client.api.BKException.Code;
+import org.apache.bookkeeper.util.ZkUtils;
+import org.apache.zookeeper.AsyncCallback.DataCallback;
+import org.apache.zookeeper.AsyncCallback.StatCallback;
+import org.apache.zookeeper.AsyncCallback.StringCallback;
+import org.apache.zookeeper.AsyncCallback.VoidCallback;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+import org.powermock.api.mockito.PowerMockito;
+
+/**
+ * A test base that provides mocked zookeeper.
+ */
+public abstract class MockZooKeeperTestCase {
+
+    protected final ConcurrentMap<String, Set<Watcher>> watchers = Maps.newConcurrentMap();
+    protected ZooKeeper mockZk;
+
+    protected void setup() throws Exception {
+        this.mockZk = mock(ZooKeeper.class);
+
+        PowerMockito.mockStatic(ZkUtils.class);
+    }
+
+    private void addWatcher(String path, Watcher watcher) {
+        if (null == watcher) {
+            return;
+        }
+        Set<Watcher> watcherSet = watchers.get(path);
+        if (null == watcherSet) {
+            watcherSet = new HashSet<>();
+            watchers.put(path, watcherSet);
+        }
+        watcherSet.add(watcher);
+    }
+
+    protected void mockZkUtilsAsyncCreateFullPathOptimistic(
+        String expectedLedgerPath,
+        CreateMode expectedCreateMode,
+        int retCode,
+        String retCreatedZnodeName
+    ) throws Exception {
+
+        PowerMockito.doAnswer(invocationOnMock -> {
+            String path = invocationOnMock.getArgument(1);
+            StringCallback callback = invocationOnMock.getArgument(5);
+            Object ctx = invocationOnMock.getArgument(6);
+
+            callback.processResult(
+                retCode, path, ctx, retCreatedZnodeName);
+            return null;
+        }).when(
+            ZkUtils.class,
+            "asyncCreateFullPathOptimistic",
+            eq(mockZk),
+            eq(expectedLedgerPath),
+            any(byte[].class),
+            anyList(),
+            eq(expectedCreateMode),
+            any(StringCallback.class),
+            any());
+
+    }
+
+    protected void mockZkDelete(
+        String expectedLedgerPath,
+        int expectedVersion,
+        int retCode
+    ) throws Exception {
+
+        doAnswer(invocationOnMock -> {
+            String path = invocationOnMock.getArgument(0);
+            VoidCallback callback = invocationOnMock.getArgument(2);
+            Object ctx = invocationOnMock.getArgument(3);
+
+            callback.processResult(
+                retCode, path, ctx
+            );
+
+            return null;
+        }).when(mockZk).delete(
+            eq(expectedLedgerPath),
+            eq(expectedVersion),
+            any(VoidCallback.class),
+            any());
+
+    }
+
+    protected void mockZkUtilsAsyncDeleteFullPathOptimistic(
+        String expectedLedgerPath,
+        int expectedZnodeVersion,
+        int retCode
+    ) throws Exception {
+
+        PowerMockito.doAnswer(invocationOnMock -> {
+            String path = invocationOnMock.getArgument(1);
+            VoidCallback callback = invocationOnMock.getArgument(3);
+
+            callback.processResult(
+                retCode, path, null);
+            return null;
+        }).when(
+            ZkUtils.class,
+            "asyncDeleteFullPathOptimistic",
+            eq(mockZk),
+            eq(expectedLedgerPath),
+            eq(expectedZnodeVersion),
+            any(VoidCallback.class),
+            eq(expectedLedgerPath));
+
+    }
+
+    protected void mockZkGetData(
+        String expectedLedgerPath,
+        boolean expectedWatcher,
+        int retCode,
+        byte[] retData,
+        Stat retStat
+    ) throws Exception {
+
+        doAnswer(invocationOnMock -> {
+            String path = invocationOnMock.getArgument(0);
+            Watcher watcher = invocationOnMock.getArgument(1);
+            DataCallback callback = invocationOnMock.getArgument(2);
+            Object ctx = invocationOnMock.getArgument(3);
+
+            if (Code.OK == retCode) {
+                addWatcher(path, watcher);
+            }
+
+            callback.processResult(
+                retCode, path, ctx, retData, retStat
+            );
+
+            return null;
+        }).when(mockZk).getData(
+            eq(expectedLedgerPath),
+            expectedWatcher ? any(Watcher.class) : eq(null),
+            any(DataCallback.class),
+            any());
+
+    }
+
+    protected void mockZkSetData(
+        String expectedLedgerPath,
+        byte[] expectedBytes,
+        int expectedVersion,
+        int retCode,
+        Stat retStat
+    ) throws Exception {
+
+        doAnswer(invocationOnMock -> {
+            String path = invocationOnMock.getArgument(0);
+            StatCallback callback = invocationOnMock.getArgument(3);
+            Object ctx = invocationOnMock.getArgument(4);
+
+            callback.processResult(
+                retCode, path, ctx, retStat
+            );
+
+            return null;
+        }).when(mockZk).setData(
+            eq(expectedLedgerPath),
+            eq(expectedBytes),
+            eq(expectedVersion),
+            any(StatCallback.class),
+            any());
+
+    }
+
+    protected boolean notifyWatchedEvent(EventType eventType,
+                                         KeeperState keeperState,
+                                         String path) {
+        Set<Watcher> watcherSet = watchers.remove(path);
+        if (null == watcherSet) {
+            return false;
+        }
+        WatchedEvent event = new WatchedEvent(
+            eventType, keeperState, path);
+        for (Watcher watcher : watcherSet) {
+            watcher.process(event);
+        }
+        return true;
+    }
+
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services