You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2018/03/12 23:29:44 UTC

[bookkeeper] branch branch-4.6 updated: handle zookeeper session expire in zk ledger manager

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch branch-4.6
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/branch-4.6 by this push:
     new d0c9a9a  handle zookeeper session expire in zk ledger manager
d0c9a9a is described below

commit d0c9a9a2c7d6e8a606b1edaf71936b7fa1df95eb
Author: Sijie Guo <si...@apache.org>
AuthorDate: Sat Feb 17 15:51:13 2018 +0800

    handle zookeeper session expire in zk ledger manager
    
    Descriptions of the changes in this PR:
    
    cherry-pick twitter/bookkeeperdfcda5cc2efdc03db99fe126499f8e3347f50484
    
    This includes a test case to test `AbstractZkLedgerManager`, including
    
        1) create/delete/read/write ledger metadata
        2) register/unregister listeners
        3) handling various watched events
    
    This also fixes a couple of cases in the `AbstractZkLedgerManager` what is caught with test case.
    
    Author: Sijie Guo <si...@apache.org>
    Author: Sijie Guo <si...@twitter.com>
    
    Reviewers: Enrico Olivelli <eo...@gmail.com>, Venkateswararao Jujjuri (JV) <None>
    
    This closes #1130 from sijie/fix_session_expires
---
 .../apache/bookkeeper/client/LedgerMetadata.java   |   2 +
 .../bookkeeper/meta/AbstractZkLedgerManager.java   |  23 +-
 .../apache/bookkeeper/versioning/LongVersion.java  |   3 +
 .../meta/AbstractZkLedgerManagerTest.java          | 883 +++++++++++++++++++++
 .../org/apache/bookkeeper/test/TestCallbacks.java  |  10 +-
 .../zookeeper/MockZooKeeperTestCase.java           | 221 ++++++
 6 files changed, 1135 insertions(+), 7 deletions(-)

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java
index 180c59e..8d747a8 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerMetadata.java
@@ -39,6 +39,7 @@ import java.util.NavigableMap;
 import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeMap;
+import lombok.EqualsAndHashCode;
 import org.apache.bookkeeper.client.api.DigestType;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat;
@@ -52,6 +53,7 @@ import org.slf4j.LoggerFactory;
  *
  * <p>It provides parsing and serialization methods of such metadata.
  */
+@EqualsAndHashCode
 public class LedgerMetadata implements org.apache.bookkeeper.client.api.LedgerMetadata {
     static final Logger LOG = LoggerFactory.getLogger(LedgerMetadata.class);
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
index c93a8a9..9a0cb81 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
@@ -17,6 +17,7 @@
  */
 package org.apache.bookkeeper.meta;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import java.io.IOException;
@@ -64,7 +65,8 @@ abstract class AbstractZkLedgerManager implements LedgerManager, Watcher {
 
     private final static Logger LOG = LoggerFactory.getLogger(AbstractZkLedgerManager.class);
 
-    static int ZK_CONNECT_BACKOFF_MS = 200;
+    @VisibleForTesting
+    static final int ZK_CONNECT_BACKOFF_MS = 200;
 
     protected final AbstractConfiguration conf;
     protected final ZooKeeper zk;
@@ -125,6 +127,13 @@ abstract class AbstractZkLedgerManager implements LedgerManager, Watcher {
                         LOG.debug("Removed ledger metadata listener set on ledger {} as its ledger is deleted : {}",
                                 ledgerId, listenerSet.size());
                     }
+                    // notify `null` as indicator that a ledger is deleted
+                    // make this behavior consistent with `NodeDeleted` watched event.
+                    synchronized (listenerSet) {
+                        for (LedgerMetadataListener listener : listenerSet) {
+                            listener.onChanged(ledgerId, null);
+                        }
+                    }
                 }
             } else {
                 LOG.warn("Failed on read ledger metadata of ledger {} : {}", ledgerId, rc);
@@ -175,8 +184,16 @@ abstract class AbstractZkLedgerManager implements LedgerManager, Watcher {
 
     @Override
     public void process(WatchedEvent event) {
-        LOG.info("Received watched event {} from zookeeper based ledger manager.", event);
+        LOG.debug("Received watched event {} from zookeeper based ledger manager.", event);
         if (Event.EventType.None == event.getType()) {
+            if (Event.KeeperState.Expired == event.getState()) {
+                LOG.info("ZooKeeper client expired on ledger manager.");
+                Set<Long> keySet = new HashSet<Long>(listeners.keySet());
+                for (Long lid : keySet) {
+                    scheduler.submit(new ReadLedgerMetadataTask(lid));
+                    LOG.info("Re-read ledger metadata for {} after zookeeper session expired.", lid);
+                }
+            }
             return;
         }
         String path = event.getPath();
@@ -401,7 +418,7 @@ abstract class AbstractZkLedgerManager implements LedgerManager, Watcher {
     public void writeLedgerMetadata(final long ledgerId, final LedgerMetadata metadata,
                                     final GenericCallback<Void> cb) {
         Version v = metadata.getVersion();
-        if (Version.NEW == v || !(v instanceof LongVersion)) {
+        if (!(v instanceof LongVersion)) {
             cb.operationComplete(BKException.Code.MetadataVersionException, null);
             return;
         }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/versioning/LongVersion.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/versioning/LongVersion.java
index 4ec4388..862977a 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/versioning/LongVersion.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/versioning/LongVersion.java
@@ -17,9 +17,12 @@
  */
 package org.apache.bookkeeper.versioning;
 
+import lombok.EqualsAndHashCode;
+
 /**
  * A version object holds integer version.
  */
+@EqualsAndHashCode
 public class LongVersion implements Version {
     protected long version;
 
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/AbstractZkLedgerManagerTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/AbstractZkLedgerManagerTest.java
new file mode 100644
index 0000000..b7c8ef5
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/AbstractZkLedgerManagerTest.java
@@ -0,0 +1,883 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.meta;
+
+import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
+import static org.apache.bookkeeper.meta.AbstractZkLedgerManager.ZK_CONNECT_BACKOFF_MS;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.CALLS_REAL_METHODS;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.withSettings;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BKException.Code;
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.client.LedgerMetadata;
+import org.apache.bookkeeper.common.testing.executors.MockExecutorController;
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.LedgerMetadataListener;
+import org.apache.bookkeeper.test.TestCallbacks.GenericCallbackFuture;
+import org.apache.bookkeeper.util.ZkUtils;
+import org.apache.bookkeeper.versioning.LongVersion;
+import org.apache.bookkeeper.versioning.Version;
+import org.apache.bookkeeper.zookeeper.MockZooKeeperTestCase;
+import org.apache.zookeeper.AsyncCallback.DataCallback;
+import org.apache.zookeeper.AsyncCallback.StatCallback;
+import org.apache.zookeeper.AsyncCallback.VoidCallback;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.apache.zookeeper.data.Stat;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+/**
+ * Unit test of {@link AbstractZkLedgerManager}.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({ AbstractZkLedgerManager.class, ZkUtils.class })
+public class AbstractZkLedgerManagerTest extends MockZooKeeperTestCase {
+
+    private ClientConfiguration conf;
+    private AbstractZkLedgerManager ledgerManager;
+    private ScheduledExecutorService scheduler;
+    private MockExecutorController schedulerController;
+    private LedgerMetadata metadata;
+
+    @Before
+    public void setup() throws Exception {
+        PowerMockito.mockStatic(Executors.class);
+
+        super.setup();
+
+        this.scheduler = PowerMockito.mock(ScheduledExecutorService.class);
+        this.schedulerController = new MockExecutorController()
+            .controlSubmit(scheduler)
+            .controlSchedule(scheduler)
+            .controlExecute(scheduler)
+            .controlScheduleAtFixedRate(scheduler, 10);
+        PowerMockito.when(Executors.newSingleThreadScheduledExecutor(any()))
+            .thenReturn(scheduler);
+
+        this.conf = new ClientConfiguration();
+        this.ledgerManager = mock(
+            AbstractZkLedgerManager.class,
+            withSettings()
+                .useConstructor(conf, mockZk)
+                .defaultAnswer(CALLS_REAL_METHODS));
+        this.metadata = new LedgerMetadata(
+            5, 3, 3,
+            DigestType.CRC32,
+            new byte[0],
+            Collections.emptyMap(),
+            false);
+
+        doAnswer(invocationOnMock -> {
+            long ledgerId = invocationOnMock.getArgument(0);
+            return String.valueOf(ledgerId);
+        }).when(ledgerManager).getLedgerPath(anyLong());
+        doAnswer(invocationOnMock -> {
+            String ledgerStr = invocationOnMock.getArgument(0);
+            return Long.parseLong(ledgerStr);
+        }).when(ledgerManager).getLedgerId(anyString());
+
+        // verify constructor
+        assertEquals(conf.getZkLedgersRootPath(), ledgerManager.ledgerRootPath);
+        assertSame(mockZk, ledgerManager.zk);
+        assertSame(conf, ledgerManager.conf);
+        assertSame(scheduler, ledgerManager.scheduler);
+    }
+
+    @After
+    public void teardown() throws Exception {
+        if (null != ledgerManager) {
+            ledgerManager.close();
+
+            // zookeeper is passed in, it should not be closed.
+            verify(mockZk, times(0)).close();
+            verify(scheduler, times(1)).shutdown();
+        }
+    }
+
+    @Test
+    public void testCreateLedgerMetadataSuccess() throws Exception {
+        long ledgerId = System.currentTimeMillis();
+        String ledgerStr = String.valueOf(ledgerId);
+        mockZkUtilsAsyncCreateFullPathOptimistic(
+            ledgerStr, CreateMode.PERSISTENT,
+            KeeperException.Code.OK.intValue(), ledgerStr
+        );
+
+        assertEquals(Version.NEW, metadata.getVersion());
+
+        GenericCallbackFuture<Void> callbackFuture = new GenericCallbackFuture<>();
+        ledgerManager.createLedgerMetadata(ledgerId, metadata, callbackFuture);
+        callbackFuture.get();
+
+        assertEquals(new LongVersion(0), metadata.getVersion());
+    }
+
+    @Test
+    public void testCreateLedgerMetadataNodeExists() throws Exception {
+        long ledgerId = System.currentTimeMillis();
+        String ledgerStr = String.valueOf(ledgerId);
+        mockZkUtilsAsyncCreateFullPathOptimistic(
+            ledgerStr, CreateMode.PERSISTENT,
+            KeeperException.Code.NODEEXISTS.intValue(), null);
+
+        assertEquals(Version.NEW, metadata.getVersion());
+
+        GenericCallbackFuture<Void> callbackFuture = new GenericCallbackFuture<>();
+        ledgerManager.createLedgerMetadata(ledgerId, metadata, callbackFuture);
+        try {
+            result(callbackFuture);
+            fail("Should fail to create ledger metadata if the ledger already exists");
+        } catch (Exception e) {
+            assertTrue(e instanceof BKException);
+            BKException bke = (BKException) e;
+            assertEquals(Code.LedgerExistException, bke.getCode());
+        }
+
+        // creation failed, so metadata should not be modified
+        assertEquals(Version.NEW, metadata.getVersion());
+    }
+
+    @Test
+    public void testCreateLedgerMetadataException() throws Exception {
+        long ledgerId = System.currentTimeMillis();
+        String ledgerStr = String.valueOf(ledgerId);
+        mockZkUtilsAsyncCreateFullPathOptimistic(
+            ledgerStr, CreateMode.PERSISTENT,
+            KeeperException.Code.CONNECTIONLOSS.intValue(), null);
+
+        assertEquals(Version.NEW, metadata.getVersion());
+
+        GenericCallbackFuture<Void> callbackFuture = new GenericCallbackFuture<>();
+        ledgerManager.createLedgerMetadata(ledgerId, metadata, callbackFuture);
+        try {
+            result(callbackFuture);
+            fail("Should fail to create ledger metadata when encountering zookeeper exception");
+        } catch (Exception e) {
+            assertTrue(e instanceof BKException);
+            BKException bke = (BKException) e;
+            assertEquals(Code.ZKException, bke.getCode());
+        }
+
+        // creation failed, so metadata should not be modified
+        assertEquals(Version.NEW, metadata.getVersion());
+    }
+
+    @Test
+    public void testRemoveLedgerMetadataSuccess() throws Exception {
+        long ledgerId = System.currentTimeMillis();
+        String ledgerStr = String.valueOf(ledgerId);
+        LongVersion version = new LongVersion(1234L);
+
+        mockZkDelete(
+            ledgerStr, (int) version.getLongVersion(),
+            KeeperException.Code.OK.intValue());
+
+        GenericCallbackFuture<Void> callbackFuture = new GenericCallbackFuture<>();
+        ledgerManager.removeLedgerMetadata(ledgerId, version, callbackFuture);
+        result(callbackFuture);
+
+        verify(mockZk, times(1))
+            .delete(eq(ledgerStr), eq(1234), any(VoidCallback.class), eq(null));
+    }
+
+    @Test
+    public void testRemoveLedgerMetadataVersionAny() throws Exception {
+        long ledgerId = System.currentTimeMillis();
+        String ledgerStr = String.valueOf(ledgerId);
+
+        mockZkDelete(
+            ledgerStr, -1,
+            KeeperException.Code.OK.intValue());
+
+        GenericCallbackFuture<Void> callbackFuture = new GenericCallbackFuture<>();
+        ledgerManager.removeLedgerMetadata(ledgerId, Version.ANY, callbackFuture);
+        result(callbackFuture);
+
+        verify(mockZk, times(1))
+            .delete(eq(ledgerStr), eq(-1), any(VoidCallback.class), eq(null));
+    }
+
+    @Test
+    public void testRemoveLedgerMetadataVersionNew() throws Exception {
+        testRemoveLedgerMetadataInvalidVersion(Version.NEW);
+    }
+
+    @Test
+    public void testRemoveLedgerMetadataUnknownVersionType() throws Exception {
+        Version version = mock(Version.class);
+        testRemoveLedgerMetadataInvalidVersion(version);
+    }
+
+    private void testRemoveLedgerMetadataInvalidVersion(Version version) throws Exception {
+        long ledgerId = System.currentTimeMillis();
+
+        GenericCallbackFuture<Void> callbackFuture = new GenericCallbackFuture<>();
+        ledgerManager.removeLedgerMetadata(ledgerId, version, callbackFuture);
+        try {
+            result(callbackFuture);
+            fail("Should fail to remove metadata if version is " + Version.NEW);
+        } catch (BKException bke) {
+            assertEquals(Code.MetadataVersionException, bke.getCode());
+        }
+    }
+
+    @Test
+    public void testRemoveLedgerMetadataNoNode() throws Exception {
+        long ledgerId = System.currentTimeMillis();
+        String ledgerStr = String.valueOf(ledgerId);
+        LongVersion version = new LongVersion(1234L);
+
+        mockZkDelete(
+            ledgerStr, (int) version.getLongVersion(),
+            KeeperException.Code.NONODE.intValue());
+
+        GenericCallbackFuture<Void> callbackFuture = new GenericCallbackFuture<>();
+        ledgerManager.removeLedgerMetadata(ledgerId, version, callbackFuture);
+        try {
+            result(callbackFuture);
+            fail("Should fail to remove metadata if no such ledger exists");
+        } catch (BKException bke) {
+            assertEquals(Code.NoSuchLedgerExistsException, bke.getCode());
+        }
+
+        verify(mockZk, times(1))
+            .delete(eq(ledgerStr), eq(1234), any(VoidCallback.class), eq(null));
+    }
+
+    @Test
+    public void testRemoveLedgerMetadataException() throws Exception {
+        long ledgerId = System.currentTimeMillis();
+        String ledgerStr = String.valueOf(ledgerId);
+        LongVersion version = new LongVersion(1234L);
+
+        mockZkDelete(
+            ledgerStr, (int) version.getLongVersion(),
+            KeeperException.Code.CONNECTIONLOSS.intValue());
+
+        GenericCallbackFuture<Void> callbackFuture = new GenericCallbackFuture<>();
+        ledgerManager.removeLedgerMetadata(ledgerId, version, callbackFuture);
+        try {
+            result(callbackFuture);
+            fail("Should fail to remove metadata if no such ledger exists");
+        } catch (BKException bke) {
+            assertEquals(Code.ZKException, bke.getCode());
+        }
+
+        verify(mockZk, times(1))
+            .delete(eq(ledgerStr), eq(1234), any(VoidCallback.class), eq(null));
+    }
+
+    @Test
+    public void testRemoveLedgerMetadataHierarchical() throws Exception {
+        HierarchicalLedgerManager hlm = new HierarchicalLedgerManager(conf, mockZk);
+        testRemoveLedgerMetadataHierarchicalLedgerManager(hlm);
+    }
+
+    @Test
+    public void testRemoveLedgerMetadataLongHierarchical() throws Exception {
+        LongHierarchicalLedgerManager hlm = new LongHierarchicalLedgerManager(conf, mockZk);
+        testRemoveLedgerMetadataHierarchicalLedgerManager(hlm);
+    }
+
+    private void testRemoveLedgerMetadataHierarchicalLedgerManager(AbstractZkLedgerManager lm) throws Exception {
+        long ledgerId = System.currentTimeMillis();
+        String ledgerStr = lm.getLedgerPath(ledgerId);
+        LongVersion version = new LongVersion(1234L);
+
+        mockZkUtilsAsyncDeleteFullPathOptimistic(
+            ledgerStr, (int) version.getLongVersion(),
+            KeeperException.Code.OK.intValue());
+
+        GenericCallbackFuture<Void> callbackFuture = new GenericCallbackFuture<>();
+        lm.removeLedgerMetadata(ledgerId, version, callbackFuture);
+        result(callbackFuture);
+
+        PowerMockito.verifyStatic(
+            ZkUtils.class, times(1));
+        ZkUtils.asyncDeleteFullPathOptimistic(
+            eq(mockZk), eq(ledgerStr), eq(1234), any(VoidCallback.class), eq(ledgerStr));
+
+    }
+
+    @Test
+    public void testReadLedgerMetadataSuccess() throws Exception {
+        long ledgerId = System.currentTimeMillis();
+        String ledgerStr = String.valueOf(ledgerId);
+
+        metadata.setVersion(new LongVersion(1234L));
+        Stat stat = mock(Stat.class);
+        when(stat.getVersion()).thenReturn(1234);
+        when(stat.getCtime()).thenReturn(metadata.getCtime());
+        mockZkGetData(
+            ledgerStr, false,
+            KeeperException.Code.OK.intValue(), metadata.serialize(), stat);
+
+        GenericCallbackFuture<LedgerMetadata> callbackFuture = new GenericCallbackFuture<>();
+        ledgerManager.readLedgerMetadata(ledgerId, callbackFuture);
+        LedgerMetadata readMetadata = result(callbackFuture);
+        assertEquals(metadata, readMetadata);
+
+        verify(mockZk, times(1))
+            .getData(eq(ledgerStr), eq(null), any(DataCallback.class), any());
+    }
+
+    @Test
+    public void testReadLedgerMetadataNoNode() throws Exception {
+        long ledgerId = System.currentTimeMillis();
+        String ledgerStr = String.valueOf(ledgerId);
+
+        mockZkGetData(
+            ledgerStr, false,
+            KeeperException.Code.NONODE.intValue(), null, null);
+
+        GenericCallbackFuture<LedgerMetadata> callbackFuture = new GenericCallbackFuture<>();
+        ledgerManager.readLedgerMetadata(ledgerId, callbackFuture);
+        try {
+            result(callbackFuture);
+            fail("Should fail on reading ledger metadata if a ledger doesn't exist");
+        } catch (BKException bke) {
+            assertEquals(Code.NoSuchLedgerExistsException, bke.getCode());
+        }
+
+        verify(mockZk, times(1))
+            .getData(eq(ledgerStr), eq(null), any(DataCallback.class), any());
+    }
+
+    @Test
+    public void testReadLedgerMetadataException() throws Exception {
+        long ledgerId = System.currentTimeMillis();
+        String ledgerStr = String.valueOf(ledgerId);
+
+        mockZkGetData(
+            ledgerStr, false,
+            KeeperException.Code.CONNECTIONLOSS.intValue(), null, null);
+
+        GenericCallbackFuture<LedgerMetadata> callbackFuture = new GenericCallbackFuture<>();
+        ledgerManager.readLedgerMetadata(ledgerId, callbackFuture);
+        try {
+            result(callbackFuture);
+            fail("Should fail on reading ledger metadata if a ledger doesn't exist");
+        } catch (BKException bke) {
+            assertEquals(Code.ZKException, bke.getCode());
+        }
+
+        verify(mockZk, times(1))
+            .getData(eq(ledgerStr), eq(null), any(DataCallback.class), any());
+    }
+
+    @Test
+    public void testReadLedgerMetadataStatMissing() throws Exception {
+        long ledgerId = System.currentTimeMillis();
+        String ledgerStr = String.valueOf(ledgerId);
+
+        mockZkGetData(
+            ledgerStr, false,
+            KeeperException.Code.OK.intValue(), metadata.serialize(), null);
+
+        GenericCallbackFuture<LedgerMetadata> callbackFuture = new GenericCallbackFuture<>();
+        ledgerManager.readLedgerMetadata(ledgerId, callbackFuture);
+        try {
+            result(callbackFuture);
+            fail("Should fail on reading ledger metadata if a ledger doesn't exist");
+        } catch (BKException bke) {
+            assertEquals(Code.ZKException, bke.getCode());
+        }
+
+        verify(mockZk, times(1))
+            .getData(eq(ledgerStr), eq(null), any(DataCallback.class), any());
+    }
+
+    @Test
+    public void testReadLedgerMetadataDataCorrupted() throws Exception {
+        long ledgerId = System.currentTimeMillis();
+        String ledgerStr = String.valueOf(ledgerId);
+
+        metadata.setVersion(new LongVersion(1234L));
+        Stat stat = mock(Stat.class);
+        when(stat.getVersion()).thenReturn(1234);
+        when(stat.getCtime()).thenReturn(metadata.getCtime());
+        mockZkGetData(
+            ledgerStr, false,
+            KeeperException.Code.OK.intValue(), new byte[0], stat);
+
+        GenericCallbackFuture<LedgerMetadata> callbackFuture = new GenericCallbackFuture<>();
+        ledgerManager.readLedgerMetadata(ledgerId, callbackFuture);
+        try {
+            result(callbackFuture);
+            fail("Should fail on reading ledger metadata if a ledger doesn't exist");
+        } catch (BKException bke) {
+            assertEquals(Code.ZKException, bke.getCode());
+        }
+
+        verify(mockZk, times(1))
+            .getData(eq(ledgerStr), eq(null), any(DataCallback.class), any());
+    }
+
+    @Test
+    public void testWriteLedgerMetadataSuccess() throws Exception {
+        long ledgerId = System.currentTimeMillis();
+        String ledgerStr = String.valueOf(ledgerId);
+
+        metadata.setVersion(new LongVersion(1234L));
+        Stat stat = mock(Stat.class);
+        when(stat.getVersion()).thenReturn(1235);
+        when(stat.getCtime()).thenReturn(metadata.getCtime());
+        mockZkSetData(
+            ledgerStr, metadata.serialize(), 1234,
+            KeeperException.Code.OK.intValue(), stat);
+
+        assertEquals(new LongVersion(1234L), metadata.getVersion());
+
+        GenericCallbackFuture<Void> callbackFuture = new GenericCallbackFuture<>();
+        ledgerManager.writeLedgerMetadata(ledgerId, metadata, callbackFuture);
+        result(callbackFuture);
+
+        assertEquals(new LongVersion(1235L), metadata.getVersion());
+
+        verify(mockZk, times(1))
+            .setData(eq(ledgerStr), any(byte[].class), eq(1234), any(StatCallback.class), any());
+    }
+
+    @Test
+    public void testWriteLedgerMetadataBadVersion() throws Exception {
+        long ledgerId = System.currentTimeMillis();
+        String ledgerStr = String.valueOf(ledgerId);
+
+        metadata.setVersion(new LongVersion(1234L));
+        mockZkSetData(
+            ledgerStr, metadata.serialize(), 1234,
+            KeeperException.Code.BADVERSION.intValue(), null);
+
+        assertEquals(new LongVersion(1234L), metadata.getVersion());
+
+        GenericCallbackFuture<Void> callbackFuture = new GenericCallbackFuture<>();
+        ledgerManager.writeLedgerMetadata(ledgerId, metadata, callbackFuture);
+        try {
+            result(callbackFuture);
+            fail("Should fail on writing ledger metadata if encountering bad version");
+        } catch (BKException bke) {
+            assertEquals(Code.MetadataVersionException, bke.getCode());
+        }
+
+        // version remain unchanged
+        assertEquals(new LongVersion(1234L), metadata.getVersion());
+
+        verify(mockZk, times(1))
+            .setData(eq(ledgerStr), any(byte[].class), eq(1234), any(StatCallback.class), any());
+    }
+
+    @Test
+    public void testWriteLedgerMetadataException() throws Exception {
+        long ledgerId = System.currentTimeMillis();
+        String ledgerStr = String.valueOf(ledgerId);
+
+        metadata.setVersion(new LongVersion(1234L));
+        mockZkSetData(
+            ledgerStr, metadata.serialize(), 1234,
+            KeeperException.Code.CONNECTIONLOSS.intValue(), null);
+
+        assertEquals(new LongVersion(1234L), metadata.getVersion());
+
+        GenericCallbackFuture<Void> callbackFuture = new GenericCallbackFuture<>();
+        ledgerManager.writeLedgerMetadata(ledgerId, metadata, callbackFuture);
+        try {
+            result(callbackFuture);
+            fail("Should fail on writing ledger metadata if encountering zookeeper exceptions");
+        } catch (BKException bke) {
+            assertEquals(Code.ZKException, bke.getCode());
+        }
+
+        // version remain unchanged
+        assertEquals(new LongVersion(1234L), metadata.getVersion());
+
+        verify(mockZk, times(1))
+            .setData(eq(ledgerStr), any(byte[].class), eq(1234), any(StatCallback.class), any());
+    }
+
+    @Test
+    public void testWriteLedgerMetadataInvalidVersion() throws Exception {
+        Version[] versions = new Version[] {
+            Version.NEW,
+            Version.ANY,
+            mock(Version.class)
+        };
+        for (Version version : versions) {
+            testWriteLedgerMetadataInvalidVersion(version);
+        }
+    }
+
+    private void testWriteLedgerMetadataInvalidVersion(Version invalidVersion) throws Exception {
+        long ledgerId = System.currentTimeMillis();
+
+        metadata.setVersion(invalidVersion);
+
+        GenericCallbackFuture<Void> callbackFuture = new GenericCallbackFuture<>();
+        ledgerManager.writeLedgerMetadata(ledgerId, metadata, callbackFuture);
+        try {
+            result(callbackFuture);
+            fail("Should fail on writing ledger metadata if an invalid version is provided.");
+        } catch (BKException bke) {
+            assertEquals(Code.MetadataVersionException, bke.getCode());
+        }
+
+        verify(mockZk, times(0))
+            .setData(anyString(), any(byte[].class), anyInt(), any(StatCallback.class), any());
+    }
+
+    @Test
+    public void testLedgerMetadataListener() throws Exception {
+        long ledgerId = System.currentTimeMillis();
+        String ledgerStr = String.valueOf(ledgerId);
+
+        LinkedBlockingQueue<LedgerMetadata> changes = new LinkedBlockingQueue<>();
+        LedgerMetadataListener listener = (ledgerId1, metadata) -> changes.add(metadata);
+
+        metadata.setVersion(new LongVersion(1234L));
+        Stat stat = mock(Stat.class);
+        when(stat.getVersion()).thenReturn(1234);
+        when(stat.getCtime()).thenReturn(metadata.getCtime());
+        mockZkGetData(
+            ledgerStr, true,
+            KeeperException.Code.OK.intValue(), metadata.serialize(), stat);
+
+        ledgerManager.registerLedgerMetadataListener(ledgerId, listener);
+
+        // the listener will be notified with first get
+        LedgerMetadata change1 = changes.take();
+        assertEquals(metadata, change1);
+        verify(mockZk, times(1))
+            .getData(anyString(), any(Watcher.class), any(DataCallback.class), any());
+
+        // the watcher is registered for receiving watched event
+        assertTrue(watchers.containsKey(ledgerStr));
+        Set<Watcher> watcherSet1 = watchers.get(ledgerStr);
+        assertEquals(1, watcherSet1.size());
+        Watcher registeredWatcher1 = watcherSet1.stream().findFirst().get();
+
+        // mock get data to return an updated metadata
+        metadata.setVersion(new LongVersion(1235L));
+        when(stat.getVersion()).thenReturn(1235);
+        mockZkGetData(
+            ledgerStr, true,
+            KeeperException.Code.OK.intValue(), metadata.serialize(), stat);
+
+        // notify the watcher event
+        notifyWatchedEvent(
+            EventType.NodeDataChanged, KeeperState.SyncConnected, ledgerStr);
+
+        // the listener should receive an updated metadata
+        LedgerMetadata change2 = changes.take();
+        assertEquals(metadata, change2);
+        verify(mockZk, times(2))
+            .getData(anyString(), any(Watcher.class), any(DataCallback.class), any());
+
+        // after the listener receive an updated metadata, a new watcher should be registered
+        // for subsequent changes again.
+        assertTrue(watchers.containsKey(ledgerStr));
+        Set<Watcher> watcherSet2 = watchers.get(ledgerStr);
+        assertEquals(1, watcherSet2.size());
+        Watcher registeredWatcher2 = watcherSet2.stream().findFirst().get();
+
+        // zookeeper watchers are same, since there is only one giant watcher per ledger manager.
+        assertSame(registeredWatcher1, registeredWatcher2);
+
+        // verify scheduler
+        verify(scheduler, times(2)).submit(any(Runnable.class));
+        verify(scheduler, times(0))
+            .schedule(any(Runnable.class), anyLong(), any(TimeUnit.class));
+    }
+
+    @Test
+    public void testLedgerMetadataListenerOnLedgerDeleted() throws Exception {
+        long ledgerId = System.currentTimeMillis();
+        String ledgerStr = String.valueOf(ledgerId);
+
+        LinkedBlockingQueue<Optional<LedgerMetadata>> changes = new LinkedBlockingQueue<>();
+        LedgerMetadataListener listener = (ledgerId1, metadata) -> changes.add(Optional.ofNullable(metadata));
+
+        metadata.setVersion(new LongVersion(1234L));
+        Stat stat = mock(Stat.class);
+        when(stat.getVersion()).thenReturn(1234);
+        when(stat.getCtime()).thenReturn(metadata.getCtime());
+        mockZkGetData(
+            ledgerStr, true,
+            KeeperException.Code.OK.intValue(), metadata.serialize(), stat);
+
+        ledgerManager.registerLedgerMetadataListener(ledgerId, listener);
+        assertTrue(ledgerManager.listeners.containsKey(ledgerId));
+
+        // the listener will be notified with first get
+        LedgerMetadata change1 = changes.take().get();
+        assertEquals(metadata, change1);
+        verify(mockZk, times(1))
+            .getData(anyString(), any(Watcher.class), any(DataCallback.class), any());
+
+        // the watcher is registered for receiving watched event
+        assertTrue(watchers.containsKey(ledgerStr));
+
+        // mock get data to simulate an ledger is deleted
+        mockZkGetData(
+            ledgerStr, true,
+            KeeperException.Code.NONODE.intValue(), null, null);
+
+        // notify the watcher event
+        notifyWatchedEvent(
+            EventType.NodeDataChanged, KeeperState.SyncConnected, ledgerStr);
+
+        // the listener should be removed from listener set and not receive an updated metadata anymore
+        Optional<LedgerMetadata> change2 = changes.take();
+        assertFalse(change2.isPresent());
+        assertFalse(ledgerManager.listeners.containsKey(ledgerId));
+
+        // verify scheduler: the listener is only triggered once
+        verify(scheduler, times(1)).submit(any(Runnable.class));
+        verify(scheduler, times(0)).schedule(
+            any(Runnable.class), anyLong(), any(TimeUnit.class));
+
+        // no watcher is registered
+        assertFalse(watchers.containsKey(ledgerStr));
+    }
+
+    @Test
+    public void testLedgerMetadataListenerOnLedgerDeletedEvent() throws Exception {
+        long ledgerId = System.currentTimeMillis();
+        String ledgerStr = String.valueOf(ledgerId);
+
+        LinkedBlockingQueue<Optional<LedgerMetadata>> changes = new LinkedBlockingQueue<>();
+        LedgerMetadataListener listener = (ledgerId1, metadata) -> changes.add(Optional.ofNullable(metadata));
+
+        metadata.setVersion(new LongVersion(1234L));
+        Stat stat = mock(Stat.class);
+        when(stat.getVersion()).thenReturn(1234);
+        when(stat.getCtime()).thenReturn(metadata.getCtime());
+        mockZkGetData(
+            ledgerStr, true,
+            KeeperException.Code.OK.intValue(), metadata.serialize(), stat);
+
+        ledgerManager.registerLedgerMetadataListener(ledgerId, listener);
+        assertTrue(ledgerManager.listeners.containsKey(ledgerId));
+
+        // the listener will be notified with first get
+        LedgerMetadata change1 = changes.take().get();
+        assertEquals(metadata, change1);
+        verify(mockZk, times(1))
+            .getData(anyString(), any(Watcher.class), any(DataCallback.class), any());
+
+        // the watcher is registered for receiving watched event
+        assertTrue(watchers.containsKey(ledgerStr));
+
+        // notify the watcher event
+        notifyWatchedEvent(
+            EventType.NodeDeleted, KeeperState.SyncConnected, ledgerStr);
+
+        // the listener should be removed from listener set and a null change is notified.
+        Optional<LedgerMetadata> change2 = changes.take();
+        assertFalse(change2.isPresent());
+        // no more `getData` is called.
+        verify(mockZk, times(1))
+            .getData(anyString(), any(Watcher.class), any(DataCallback.class), any());
+        // listener is automatically unregistered after a ledger is deleted.
+        assertFalse(ledgerManager.listeners.containsKey(ledgerId));
+    }
+
+    @Test
+    public void testLedgerMetadataListenerOnRetries() throws Exception {
+        long ledgerId = System.currentTimeMillis();
+        String ledgerStr = String.valueOf(ledgerId);
+
+        LinkedBlockingQueue<LedgerMetadata> changes = new LinkedBlockingQueue<>();
+        LedgerMetadataListener listener = (ledgerId1, metadata) -> changes.add(metadata);
+
+        metadata.setVersion(new LongVersion(1234L));
+        Stat stat = mock(Stat.class);
+        when(stat.getVersion()).thenReturn(1234);
+        when(stat.getCtime()).thenReturn(metadata.getCtime());
+
+        // fail the first get, so the ledger manager will retry get data again.
+        mockZkGetData(
+            ledgerStr, true,
+            KeeperException.Code.SESSIONEXPIRED.intValue(), null, null);
+
+        ledgerManager.registerLedgerMetadataListener(ledgerId, listener);
+        assertTrue(ledgerManager.listeners.containsKey(ledgerId));
+
+        // the listener will not be notified with any updates
+        assertNull(changes.poll());
+        // an retry task is scheduled
+        verify(scheduler, times(1))
+            .schedule(any(Runnable.class), anyLong(), any(TimeUnit.class));
+        // zookeeper is called once
+        verify(mockZk, times(1))
+            .getData(anyString(), any(Watcher.class), any(DataCallback.class), any());
+        // watcher is not registered since getData call is failed
+        assertFalse(watchers.containsKey(ledgerStr));
+
+        // mock get data to return a valid response
+        mockZkGetData(
+            ledgerStr, true,
+            KeeperException.Code.OK.intValue(), metadata.serialize(), stat);
+
+        schedulerController.advance(Duration.ofMillis(ZK_CONNECT_BACKOFF_MS));
+
+        // the listener will be notified with first get
+        LedgerMetadata change = changes.take();
+        assertEquals(metadata, change);
+        verify(mockZk, times(2))
+            .getData(anyString(), any(Watcher.class), any(DataCallback.class), any());
+
+        // watcher is registered after successfully `getData`
+        assertTrue(watchers.containsKey(ledgerStr));
+    }
+
+    @Test
+    public void testLedgerMetadataListenerOnSessionExpired() throws Exception {
+        long ledgerId = System.currentTimeMillis();
+        String ledgerStr = String.valueOf(ledgerId);
+
+        LinkedBlockingQueue<LedgerMetadata> changes = new LinkedBlockingQueue<>();
+        LedgerMetadataListener listener = (ledgerId1, metadata) -> changes.add(metadata);
+
+        metadata.setVersion(new LongVersion(1234L));
+        Stat stat = mock(Stat.class);
+        when(stat.getVersion()).thenReturn(1234);
+        when(stat.getCtime()).thenReturn(metadata.getCtime());
+        mockZkGetData(
+            ledgerStr, true,
+            KeeperException.Code.OK.intValue(), metadata.serialize(), stat);
+
+        ledgerManager.registerLedgerMetadataListener(ledgerId, listener);
+
+        // the listener will be notified with first get
+        LedgerMetadata change1 = changes.take();
+        assertEquals(metadata, change1);
+        verify(mockZk, times(1))
+            .getData(anyString(), any(Watcher.class), any(DataCallback.class), any());
+
+        // the watcher is registered for receiving watched event
+        assertTrue(watchers.containsKey(ledgerStr));
+        Set<Watcher> watcherSet1 = watchers.get(ledgerStr);
+        assertEquals(1, watcherSet1.size());
+        Watcher registeredWatcher1 = watcherSet1.stream().findFirst().get();
+
+        // simulate session expired
+        notifyWatchedEvent(
+            EventType.None, KeeperState.Expired, ledgerStr);
+
+        // ledger manager will retry to read metadata again
+        LedgerMetadata change2 = changes.take();
+        assertEquals(metadata, change2);
+        verify(mockZk, times(2))
+            .getData(anyString(), any(Watcher.class), any(DataCallback.class), any());
+
+        // the watcher is registered for receiving watched event
+        assertTrue(watchers.containsKey(ledgerStr));
+        Set<Watcher> watcherSet2 = watchers.get(ledgerStr);
+        assertEquals(1, watcherSet2.size());
+        Watcher registeredWatcher2 = watcherSet2.stream().findFirst().get();
+
+        assertSame(registeredWatcher1, registeredWatcher2);
+    }
+
+    @Test
+    public void testUnregisterLedgerMetadataListener() throws Exception {
+        long ledgerId = System.currentTimeMillis();
+        String ledgerStr = String.valueOf(ledgerId);
+
+        LinkedBlockingQueue<LedgerMetadata> changes = new LinkedBlockingQueue<>();
+        LedgerMetadataListener listener = (ledgerId1, metadata) -> changes.add(metadata);
+
+        metadata.setVersion(new LongVersion(1234L));
+        Stat stat = mock(Stat.class);
+        when(stat.getVersion()).thenReturn(1234);
+        when(stat.getCtime()).thenReturn(metadata.getCtime());
+        mockZkGetData(
+            ledgerStr, true,
+            KeeperException.Code.OK.intValue(), metadata.serialize(), stat);
+
+        ledgerManager.registerLedgerMetadataListener(ledgerId, listener);
+        assertTrue(ledgerManager.listeners.containsKey(ledgerId));
+
+        // the listener will be notified with first get
+        LedgerMetadata change1 = changes.take();
+        assertEquals(metadata, change1);
+        verify(mockZk, times(1))
+            .getData(anyString(), any(Watcher.class), any(DataCallback.class), any());
+
+        // the watcher is registered for receiving watched event
+        assertTrue(watchers.containsKey(ledgerStr));
+        Set<Watcher> watcherSet1 = watchers.get(ledgerStr);
+        assertEquals(1, watcherSet1.size());
+        Watcher registeredWatcher1 = watcherSet1.stream().findFirst().get();
+
+        // mock get data to return an updated metadata
+        metadata.setVersion(new LongVersion(1235L));
+        when(stat.getVersion()).thenReturn(1235);
+        mockZkGetData(
+            ledgerStr, true,
+            KeeperException.Code.OK.intValue(), metadata.serialize(), stat);
+
+        // unregister the listener
+        ledgerManager.unregisterLedgerMetadataListener(ledgerId, listener);
+        assertFalse(ledgerManager.listeners.containsKey(ledgerId));
+
+        // notify the watcher event
+        notifyWatchedEvent(
+            EventType.NodeDataChanged, KeeperState.SyncConnected, ledgerStr);
+
+        // since listener is already unregistered so no more `getData` is issued.
+        assertNull(changes.poll());
+        verify(mockZk, times(1))
+            .getData(anyString(), any(Watcher.class), any(DataCallback.class), any());
+    }
+
+}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/TestCallbacks.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/TestCallbacks.java
index 5d9f99b..7c52e3a 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/TestCallbacks.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/TestCallbacks.java
@@ -20,11 +20,13 @@
  */
 package org.apache.bookkeeper.test;
 
+import com.google.common.util.concurrent.AbstractFuture;
+
+import java.util.concurrent.CompletableFuture;
 import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
-import com.google.common.util.concurrent.AbstractFuture;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -36,13 +38,13 @@ public class TestCallbacks {
     private static final Logger logger = LoggerFactory.getLogger(TestCallbacks.class);
 
     public static class GenericCallbackFuture<T>
-        extends AbstractFuture<T> implements GenericCallback<T> {
+        extends CompletableFuture<T> implements GenericCallback<T> {
         @Override
         public void operationComplete(int rc, T value) {
             if (rc != BKException.Code.OK) {
-                setException(BKException.create(rc));
+                completeExceptionally(BKException.create(rc));
             } else {
-                set(value);
+                complete(value);
             }
         }
     }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/zookeeper/MockZooKeeperTestCase.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/zookeeper/MockZooKeeperTestCase.java
new file mode 100644
index 0000000..eb28229
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/zookeeper/MockZooKeeperTestCase.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.zookeeper;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyList;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+
+import com.google.common.collect.Maps;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.bookkeeper.client.api.BKException.Code;
+import org.apache.bookkeeper.util.ZkUtils;
+import org.apache.zookeeper.AsyncCallback.DataCallback;
+import org.apache.zookeeper.AsyncCallback.StatCallback;
+import org.apache.zookeeper.AsyncCallback.StringCallback;
+import org.apache.zookeeper.AsyncCallback.VoidCallback;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+import org.powermock.api.mockito.PowerMockito;
+
+/**
+ * A test base that provides mocked zookeeper.
+ */
+public abstract class MockZooKeeperTestCase {
+
+    protected final ConcurrentMap<String, Set<Watcher>> watchers = Maps.newConcurrentMap();
+    protected ZooKeeper mockZk;
+
+    protected void setup() throws Exception {
+        this.mockZk = mock(ZooKeeper.class);
+
+        PowerMockito.mockStatic(ZkUtils.class);
+    }
+
+    private void addWatcher(String path, Watcher watcher) {
+        if (null == watcher) {
+            return;
+        }
+        Set<Watcher> watcherSet = watchers.get(path);
+        if (null == watcherSet) {
+            watcherSet = new HashSet<>();
+            watchers.put(path, watcherSet);
+        }
+        watcherSet.add(watcher);
+    }
+
+    protected void mockZkUtilsAsyncCreateFullPathOptimistic(
+        String expectedLedgerPath,
+        CreateMode expectedCreateMode,
+        int retCode,
+        String retCreatedZnodeName
+    ) throws Exception {
+
+        PowerMockito.doAnswer(invocationOnMock -> {
+            String path = invocationOnMock.getArgument(1);
+            StringCallback callback = invocationOnMock.getArgument(5);
+            Object ctx = invocationOnMock.getArgument(6);
+
+            callback.processResult(
+                retCode, path, ctx, retCreatedZnodeName);
+            return null;
+        }).when(
+            ZkUtils.class,
+            "asyncCreateFullPathOptimistic",
+            eq(mockZk),
+            eq(expectedLedgerPath),
+            any(byte[].class),
+            anyList(),
+            eq(expectedCreateMode),
+            any(StringCallback.class),
+            any());
+
+    }
+
+    protected void mockZkDelete(
+        String expectedLedgerPath,
+        int expectedVersion,
+        int retCode
+    ) throws Exception {
+
+        doAnswer(invocationOnMock -> {
+            String path = invocationOnMock.getArgument(0);
+            VoidCallback callback = invocationOnMock.getArgument(2);
+            Object ctx = invocationOnMock.getArgument(3);
+
+            callback.processResult(
+                retCode, path, ctx
+            );
+
+            return null;
+        }).when(mockZk).delete(
+            eq(expectedLedgerPath),
+            eq(expectedVersion),
+            any(VoidCallback.class),
+            any());
+
+    }
+
+    protected void mockZkUtilsAsyncDeleteFullPathOptimistic(
+        String expectedLedgerPath,
+        int expectedZnodeVersion,
+        int retCode
+    ) throws Exception {
+
+        PowerMockito.doAnswer(invocationOnMock -> {
+            String path = invocationOnMock.getArgument(1);
+            VoidCallback callback = invocationOnMock.getArgument(3);
+
+            callback.processResult(
+                retCode, path, null);
+            return null;
+        }).when(
+            ZkUtils.class,
+            "asyncDeleteFullPathOptimistic",
+            eq(mockZk),
+            eq(expectedLedgerPath),
+            eq(expectedZnodeVersion),
+            any(VoidCallback.class),
+            eq(expectedLedgerPath));
+
+    }
+
+    protected void mockZkGetData(
+        String expectedLedgerPath,
+        boolean expectedWatcher,
+        int retCode,
+        byte[] retData,
+        Stat retStat
+    ) throws Exception {
+
+        doAnswer(invocationOnMock -> {
+            String path = invocationOnMock.getArgument(0);
+            Watcher watcher = invocationOnMock.getArgument(1);
+            DataCallback callback = invocationOnMock.getArgument(2);
+            Object ctx = invocationOnMock.getArgument(3);
+
+            if (Code.OK == retCode) {
+                addWatcher(path, watcher);
+            }
+
+            callback.processResult(
+                retCode, path, ctx, retData, retStat
+            );
+
+            return null;
+        }).when(mockZk).getData(
+            eq(expectedLedgerPath),
+            expectedWatcher ? any(Watcher.class) : eq(null),
+            any(DataCallback.class),
+            any());
+
+    }
+
+    protected void mockZkSetData(
+        String expectedLedgerPath,
+        byte[] expectedBytes,
+        int expectedVersion,
+        int retCode,
+        Stat retStat
+    ) throws Exception {
+
+        doAnswer(invocationOnMock -> {
+            String path = invocationOnMock.getArgument(0);
+            StatCallback callback = invocationOnMock.getArgument(3);
+            Object ctx = invocationOnMock.getArgument(4);
+
+            callback.processResult(
+                retCode, path, ctx, retStat
+            );
+
+            return null;
+        }).when(mockZk).setData(
+            eq(expectedLedgerPath),
+            eq(expectedBytes),
+            eq(expectedVersion),
+            any(StatCallback.class),
+            any());
+
+    }
+
+    protected boolean notifyWatchedEvent(EventType eventType,
+                                         KeeperState keeperState,
+                                         String path) {
+        Set<Watcher> watcherSet = watchers.remove(path);
+        if (null == watcherSet) {
+            return false;
+        }
+        WatchedEvent event = new WatchedEvent(
+            eventType, keeperState, path);
+        for (Watcher watcher : watcherSet) {
+            watcher.process(event);
+        }
+        return true;
+    }
+
+}

-- 
To stop receiving notification emails like this one, please contact
sijie@apache.org.