You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2018/09/10 23:31:17 UTC

[incubator-pulsar] branch master updated: Readd MockBookKeeper to Pulsar (#2544)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 69571f8  Readd MockBookKeeper to Pulsar (#2544)
69571f8 is described below

commit 69571f8ef67b99d66c7d405e17870100d0aa5080
Author: Ivan Kelly <iv...@apache.org>
AuthorDate: Tue Sep 11 01:31:14 2018 +0200

    Readd MockBookKeeper to Pulsar (#2544)
    
    These mocks were moved out of pulsar to the bookkeeper project a few
    months ago. While it is good to have mocks generally available for
    bookkeeper, if the mock is not in the pulsar code base and we want to
    change the behaviour of the mock for a specific case, we need to wait
    for a bookkeeper release cycle to do so. It's better to have the mock
    in Pulsar, so we can bend it to our needs.
---
 .../bookkeeper/client/PulsarMockBookKeeper.java    | 284 +++++++++++++++++++++
 .../bookkeeper/client/PulsarMockLedgerHandle.java  | 243 ++++++++++++++++++
 .../bookkeeper/client/PulsarMockReadHandle.java    | 125 +++++++++
 .../mledger/impl/ManagedLedgerErrorsTest.java      |   4 +-
 .../bookkeeper/test/MockedBookKeeperTestCase.java  |   6 +-
 .../broker/MockedBookKeeperClientFactory.java      |  18 +-
 .../broker/auth/MockedPulsarServiceBaseTest.java   |  31 ++-
 .../PersistentDispatcherFailoverConsumerTest.java  |   3 +-
 .../pulsar/broker/service/PersistentTopicTest.java |   3 +-
 .../pulsar/broker/service/ServerCnxTest.java       |   3 +-
 .../impl/BlobStoreManagedLedgerOffloaderTest.java  |   6 +-
 11 files changed, 704 insertions(+), 22 deletions(-)

diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java b/managed-ledger/src/test/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java
new file mode 100644
index 0000000..f3689e9
--- /dev/null
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java
@@ -0,0 +1,284 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.client;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.bookkeeper.client.AsyncCallback.CreateCallback;
+import org.apache.bookkeeper.client.AsyncCallback.DeleteCallback;
+import org.apache.bookkeeper.client.AsyncCallback.OpenCallback;
+import org.apache.bookkeeper.client.api.OpenBuilder;
+import org.apache.bookkeeper.client.api.ReadHandle;
+import org.apache.bookkeeper.client.impl.OpenBuilderBase;
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.zookeeper.ZooKeeper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Mocked version of BookKeeper client that keeps all ledgers data in memory.
+ *
+ * <p>This mocked client is meant to be used in unit tests for applications using the BookKeeper API.
+ */
+public class PulsarMockBookKeeper extends BookKeeper {
+
+    final ExecutorService executor;
+    final ZooKeeper zkc;
+
+    @Override
+    public ZooKeeper getZkHandle() {
+        return zkc;
+    }
+
+    @Override
+    public ClientConfiguration getConf() {
+        return super.getConf();
+    }
+
+    Map<Long, PulsarMockLedgerHandle> ledgers = new ConcurrentHashMap<>();
+    AtomicLong sequence = new AtomicLong(3);
+
+    CompletableFuture<Void> defaultResponse = CompletableFuture.completedFuture(null);
+    List<CompletableFuture<Void>> failures = new ArrayList<>();
+
+    public PulsarMockBookKeeper(ZooKeeper zkc, ExecutorService executor) throws Exception {
+        this.zkc = zkc;
+        this.executor = executor;
+    }
+
+    @Override
+    public LedgerHandle createLedger(DigestType digestType, byte passwd[])
+            throws BKException, InterruptedException {
+        return createLedger(3, 2, digestType, passwd);
+    }
+
+    @Override
+    public LedgerHandle createLedger(int ensSize, int qSize, DigestType digestType, byte passwd[])
+            throws BKException, InterruptedException {
+        return createLedger(ensSize, qSize, qSize, digestType, passwd);
+    }
+
+    @Override
+    public void asyncCreateLedger(int ensSize, int writeQuorumSize, int ackQuorumSize, final DigestType digestType,
+            final byte[] passwd, final CreateCallback cb, final Object ctx, Map<String, byte[]> properties) {
+        getProgrammedFailure().thenComposeAsync((res) -> {
+                try {
+                    long id = sequence.getAndIncrement();
+                    log.info("Creating ledger {}", id);
+                    PulsarMockLedgerHandle lh = new PulsarMockLedgerHandle(PulsarMockBookKeeper.this, id, digestType, passwd);
+                    ledgers.put(id, lh);
+                    return FutureUtils.value(lh);
+                } catch (Throwable t) {
+                    return FutureUtils.exception(t);
+                }
+            }, executor).whenCompleteAsync((lh, exception) -> {
+                    if (exception != null) {
+                        cb.createComplete(getExceptionCode(exception), null, ctx);
+                    } else {
+                        cb.createComplete(BKException.Code.OK, lh, ctx);
+                    }
+                }, executor);
+    }
+
+    @Override
+    public LedgerHandle createLedger(int ensSize, int writeQuorumSize, int ackQuorumSize, DigestType digestType,
+            byte[] passwd) throws BKException, InterruptedException {
+        checkProgrammedFail();
+
+        try {
+            long id = sequence.getAndIncrement();
+            log.info("Creating ledger {}", id);
+            PulsarMockLedgerHandle lh = new PulsarMockLedgerHandle(this, id, digestType, passwd);
+            ledgers.put(id, lh);
+            return lh;
+        } catch (Throwable t) {
+            log.error("Exception:", t);
+            return null;
+        }
+    }
+
+    @Override
+    public void asyncCreateLedger(int ensSize, int qSize, DigestType digestType, byte[] passwd, CreateCallback cb,
+            Object ctx) {
+        asyncCreateLedger(ensSize, qSize, qSize, digestType, passwd, cb, ctx, Collections.emptyMap());
+    }
+
+    @Override
+    public void asyncOpenLedger(long lId, DigestType digestType, byte[] passwd, OpenCallback cb, Object ctx) {
+        getProgrammedFailure().thenComposeAsync((res) -> {
+                PulsarMockLedgerHandle lh = ledgers.get(lId);
+                if (lh == null) {
+                    return FutureUtils.exception(new BKException.BKNoSuchLedgerExistsException());
+                } else if (lh.digest != digestType) {
+                    return FutureUtils.exception(new BKException.BKDigestMatchException());
+                } else if (!Arrays.equals(lh.passwd, passwd)) {
+                    return FutureUtils.exception(new BKException.BKUnauthorizedAccessException());
+                } else {
+                    return FutureUtils.value(lh);
+                }
+            }, executor).whenCompleteAsync((ledger, exception) -> {
+                    if (exception != null) {
+                        cb.openComplete(getExceptionCode(exception), null, ctx);
+                    } else {
+                        cb.openComplete(BKException.Code.OK, ledger, ctx);
+                    }
+                }, executor);
+    }
+
+    @Override
+    public void asyncOpenLedgerNoRecovery(long lId, DigestType digestType, byte[] passwd, OpenCallback cb, Object ctx) {
+        asyncOpenLedger(lId, digestType, passwd, cb, ctx);
+    }
+
+    @Override
+    public void asyncDeleteLedger(long lId, DeleteCallback cb, Object ctx) {
+        getProgrammedFailure().thenComposeAsync((res) -> {
+                if (ledgers.containsKey(lId)) {
+                    ledgers.remove(lId);
+                    return FutureUtils.value(null);
+                } else {
+                    return FutureUtils.exception(new BKException.BKNoSuchLedgerExistsException());
+                }
+            }, executor).whenCompleteAsync((res, exception) -> {
+                    if (exception != null) {
+                        cb.deleteComplete(getExceptionCode(exception), ctx);
+                    } else {
+                        cb.deleteComplete(BKException.Code.OK, ctx);
+                    }
+                }, executor);
+    }
+
+    @Override
+    public void deleteLedger(long lId) throws InterruptedException, BKException {
+        checkProgrammedFail();
+
+        if (!ledgers.containsKey(lId)) {
+            throw BKException.create(BKException.Code.NoSuchLedgerExistsException);
+        }
+
+        ledgers.remove(lId);
+    }
+
+    @Override
+    public void close() throws InterruptedException, BKException {
+        shutdown();
+    }
+
+    @Override
+    public OpenBuilder newOpenLedgerOp() {
+        return new OpenBuilderBase() {
+            @Override
+            public CompletableFuture<ReadHandle> execute() {
+                return getProgrammedFailure().thenCompose(
+                        (res) -> {
+                            if (!validate()) {
+                                return FutureUtils.exception(new BKException.BKNoSuchLedgerExistsException());
+                            }
+
+                            PulsarMockLedgerHandle lh = ledgers.get(ledgerId);
+                            if (lh == null) {
+                                return FutureUtils.exception(new BKException.BKNoSuchLedgerExistsException());
+                            } else if (lh.digest != DigestType.fromApiDigestType(digestType)) {
+                                return FutureUtils.exception(new BKException.BKDigestMatchException());
+                            } else if (!Arrays.equals(lh.passwd, password)) {
+                                return FutureUtils.exception(new BKException.BKUnauthorizedAccessException());
+                            } else {
+                                return FutureUtils.value(new PulsarMockReadHandle(PulsarMockBookKeeper.this, ledgerId,
+                                                                                  lh.getLedgerMetadata(), lh.entries));
+                            }
+                        });
+            }
+        };
+    }
+
+    public void shutdown() {
+        try {
+            super.close();
+        } catch (Exception e) {
+        }
+        synchronized (this) {
+            defaultResponse = FutureUtils.exception(new BKException.BKClientClosedException());
+        }
+        for (PulsarMockLedgerHandle ledger : ledgers.values()) {
+            ledger.entries.clear();
+        }
+
+        ledgers.clear();
+    }
+
+    public Set<Long> getLedgers() {
+        return ledgers.keySet();
+    }
+
+    void checkProgrammedFail() throws BKException, InterruptedException {
+        try {
+            getProgrammedFailure().get();
+        } catch (ExecutionException ee) {
+            if (ee.getCause() instanceof BKException) {
+                throw (BKException)ee.getCause();
+            } else {
+                throw new BKException.BKUnexpectedConditionException();
+            }
+        }
+    }
+
+    synchronized CompletableFuture<Void> getProgrammedFailure() {
+        return failures.isEmpty() ? defaultResponse : failures.remove(0);
+    }
+
+    public void failNow(int rc) {
+        failAfter(0, rc);
+    }
+
+    public void failAfter(int steps, int rc) {
+        promiseAfter(steps).completeExceptionally(BKException.create(rc));
+    }
+
+    public synchronized CompletableFuture<Void> promiseAfter(int steps) {
+        while (failures.size() <= steps) {
+            failures.add(defaultResponse);
+        }
+        CompletableFuture<Void> promise = new CompletableFuture<>();
+        failures.set(steps, promise);
+        return promise;
+    }
+
+    static int getExceptionCode(Throwable t) {
+        if (t instanceof BKException) {
+            return ((BKException) t).getCode();
+        } else if (t.getCause() != null) {
+            return getExceptionCode(t.getCause());
+        } else {
+            return BKException.Code.UnexpectedConditionException;
+        }
+    }
+
+    private static final Logger log = LoggerFactory.getLogger(PulsarMockBookKeeper.class);
+}
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java b/managed-ledger/src/test/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java
new file mode 100644
index 0000000..3037a78
--- /dev/null
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java
@@ -0,0 +1,243 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.client;
+
+import com.google.common.collect.Lists;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
+import java.security.GeneralSecurityException;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.EnumSet;
+import java.util.Enumeration;
+import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
+import org.apache.bookkeeper.client.AsyncCallback.CloseCallback;
+import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.client.api.LastConfirmedAndEntry;
+import org.apache.bookkeeper.client.api.LedgerEntries;
+import org.apache.bookkeeper.client.api.ReadHandle;
+import org.apache.bookkeeper.client.api.WriteFlag;
+import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Mock BK {@link LedgerHandle}. Used by {@link PulsarMockBookKeeper}.
+ */
+public class PulsarMockLedgerHandle extends LedgerHandle {
+
+    final ArrayList<LedgerEntryImpl> entries = Lists.newArrayList();
+    final PulsarMockBookKeeper bk;
+    final long id;
+    final DigestType digest;
+    final byte[] passwd;
+    final ReadHandle readHandle;
+    long lastEntry = -1;
+    boolean fenced = false;
+
+    PulsarMockLedgerHandle(PulsarMockBookKeeper bk, long id,
+                           DigestType digest, byte[] passwd) throws GeneralSecurityException {
+        super(bk, id, new LedgerMetadata(3, 3, 2, DigestType.MAC, "".getBytes()), DigestType.MAC, "".getBytes(),
+                EnumSet.noneOf(WriteFlag.class));
+        this.bk = bk;
+        this.id = id;
+        this.digest = digest;
+        this.passwd = Arrays.copyOf(passwd, passwd.length);
+
+        readHandle = new PulsarMockReadHandle(bk, id, getLedgerMetadata(), entries);
+    }
+
+    @Override
+    public void asyncClose(CloseCallback cb, Object ctx) {
+        bk.getProgrammedFailure().thenComposeAsync((res) -> {
+                fenced = true;
+                return FutureUtils.value(null);
+            }, bk.executor).whenCompleteAsync((res, exception) -> {
+                    if (exception != null) {
+                        cb.closeComplete(PulsarMockBookKeeper.getExceptionCode(exception), null, ctx);
+                    } else {
+                        cb.closeComplete(BKException.Code.OK, this, ctx);
+                    }
+                }, bk.executor);
+    }
+
+    @Override
+    public void asyncReadEntries(final long firstEntry, final long lastEntry, final ReadCallback cb, final Object ctx) {
+        bk.getProgrammedFailure().thenComposeAsync((res) -> {
+                log.debug("readEntries: first={} last={} total={}", firstEntry, lastEntry, entries.size());
+                final Queue<LedgerEntry> seq = new ArrayDeque<LedgerEntry>();
+                long entryId = firstEntry;
+                while (entryId <= lastEntry && entryId < entries.size()) {
+                    seq.add(new LedgerEntry(entries.get((int) entryId++).duplicate()));
+                }
+
+                log.debug("Entries read: {}", seq);
+
+                try {
+                    Thread.sleep(1);
+                } catch (InterruptedException e) {
+                }
+
+                Enumeration<LedgerEntry> entries = new Enumeration<LedgerEntry>() {
+                        @Override
+                        public boolean hasMoreElements() {
+                            return !seq.isEmpty();
+                        }
+
+                        @Override
+                        public LedgerEntry nextElement() {
+                            return seq.remove();
+                        }
+                    };
+                return FutureUtils.value(entries);
+            }).whenCompleteAsync((res, exception) -> {
+                    if (exception != null) {
+                        cb.readComplete(PulsarMockBookKeeper.getExceptionCode(exception), PulsarMockLedgerHandle.this, null, ctx);
+                    } else {
+                        cb.readComplete(BKException.Code.OK, PulsarMockLedgerHandle.this, res, ctx);
+                    }
+                }, bk.executor);
+    }
+
+    @Override
+    public long addEntry(byte[] data) throws InterruptedException, BKException {
+        try {
+            bk.checkProgrammedFail();
+        } catch (BKException e) {
+            fenced = true;
+            throw e;
+        }
+
+        if (fenced) {
+            throw BKException.create(BKException.Code.LedgerFencedException);
+        }
+
+        lastEntry = entries.size();
+        entries.add(LedgerEntryImpl.create(ledgerId, lastEntry, data.length, Unpooled.wrappedBuffer(data)));
+        return lastEntry;
+    }
+
+    @Override
+    public void asyncAddEntry(final byte[] data, final AddCallback cb, final Object ctx) {
+        asyncAddEntry(data, 0, data.length, cb, ctx);
+    }
+
+    @Override
+    public void asyncAddEntry(final byte[] data, final int offset, final int length, final AddCallback cb,
+            final Object ctx) {
+        asyncAddEntry(Unpooled.wrappedBuffer(data, offset, length), cb, ctx);
+    }
+
+    @Override
+    public void asyncAddEntry(final ByteBuf data, final AddCallback cb, final Object ctx) {
+        data.retain();
+        bk.getProgrammedFailure().thenComposeAsync((res) -> {
+                try {
+                    Thread.sleep(1);
+                } catch (InterruptedException e) {
+                }
+
+                if (fenced) {
+                    return FutureUtils.exception(new BKException.BKLedgerFencedException());
+                } else {
+                    lastEntry = entries.size();
+                    byte[] storedData = new byte[data.readableBytes()];
+                    data.readBytes(storedData);
+                    entries.add(LedgerEntryImpl.create(ledgerId, lastEntry,
+                                                       storedData.length, Unpooled.wrappedBuffer(storedData)));
+                    return FutureUtils.value(lastEntry);
+                }
+
+            }, bk.executor).whenCompleteAsync((entryId, exception) -> {
+                    data.release();
+                    if (exception != null) {
+                        fenced = true;
+                        cb.addComplete(PulsarMockBookKeeper.getExceptionCode(exception),
+                                       PulsarMockLedgerHandle.this, INVALID_ENTRY_ID, ctx);
+                    } else {
+                        cb.addComplete(BKException.Code.OK, PulsarMockLedgerHandle.this, entryId, ctx);
+                    }
+                });
+    }
+
+    @Override
+    public long getId() {
+        return ledgerId;
+    }
+
+    @Override
+    public long getLastAddConfirmed() {
+        return lastEntry;
+    }
+
+    @Override
+    public long getLength() {
+        long length = 0;
+        for (LedgerEntryImpl entry : entries) {
+            length += entry.getLength();
+        }
+
+        return length;
+    }
+
+
+    // ReadHandle interface
+    @Override
+    public CompletableFuture<LedgerEntries> readAsync(long firstEntry, long lastEntry) {
+        return readHandle.readAsync(firstEntry, lastEntry);
+    }
+
+    @Override
+    public CompletableFuture<LedgerEntries> readUnconfirmedAsync(long firstEntry, long lastEntry) {
+        return readHandle.readUnconfirmedAsync(firstEntry, lastEntry);
+    }
+
+    @Override
+    public CompletableFuture<Long> readLastAddConfirmedAsync() {
+        return readHandle.readLastAddConfirmedAsync();
+    }
+
+    @Override
+    public CompletableFuture<Long> tryReadLastAddConfirmedAsync() {
+        return readHandle.tryReadLastAddConfirmedAsync();
+    }
+
+    @Override
+    public boolean isClosed() {
+        return readHandle.isClosed();
+    }
+
+    @Override
+    public CompletableFuture<LastConfirmedAndEntry> readLastAddConfirmedAndEntryAsync(long entryId,
+                                                                                      long timeOutInMillis,
+                                                                                      boolean parallel) {
+        return readHandle.readLastAddConfirmedAndEntryAsync(entryId, timeOutInMillis, parallel);
+    }
+
+    private static final Logger log = LoggerFactory.getLogger(PulsarMockLedgerHandle.class);
+
+}
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/client/PulsarMockReadHandle.java b/managed-ledger/src/test/java/org/apache/bookkeeper/client/PulsarMockReadHandle.java
new file mode 100644
index 0000000..30bcf46
--- /dev/null
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/client/PulsarMockReadHandle.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.client;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.client.api.LastConfirmedAndEntry;
+import org.apache.bookkeeper.client.api.LedgerEntries;
+import org.apache.bookkeeper.client.api.LedgerEntry;
+import org.apache.bookkeeper.client.api.LedgerMetadata;
+import org.apache.bookkeeper.client.api.ReadHandle;
+import org.apache.bookkeeper.client.impl.LedgerEntriesImpl;
+import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
+
+/**
+ * Mock implementation of ReadHandle.
+ */
+@Slf4j
+class PulsarMockReadHandle implements ReadHandle {
+    private final PulsarMockBookKeeper bk;
+    private final long ledgerId;
+    private final LedgerMetadata metadata;
+    private final List<LedgerEntryImpl> entries;
+
+    PulsarMockReadHandle(PulsarMockBookKeeper bk, long ledgerId, LedgerMetadata metadata, List<LedgerEntryImpl> entries) {
+        this.bk = bk;
+        this.ledgerId = ledgerId;
+        this.metadata = metadata;
+        this.entries = entries;
+    }
+
+    @Override
+    public CompletableFuture<LedgerEntries> readAsync(long firstEntry, long lastEntry) {
+        return bk.getProgrammedFailure().thenComposeAsync((res) -> {
+                log.debug("readEntries: first={} last={} total={}", firstEntry, lastEntry, entries.size());
+                List<LedgerEntry> seq = new ArrayList<>();
+                long entryId = firstEntry;
+                while (entryId <= lastEntry && entryId < entries.size()) {
+                    seq.add(entries.get((int) entryId++).duplicate());
+                }
+                log.debug("Entries read: {}", seq);
+
+                return FutureUtils.value(LedgerEntriesImpl.create(seq));
+            });
+    }
+
+    @Override
+    public CompletableFuture<LedgerEntries> readUnconfirmedAsync(long firstEntry, long lastEntry) {
+        return readAsync(firstEntry, lastEntry);
+    }
+
+    @Override
+    public CompletableFuture<Long> readLastAddConfirmedAsync() {
+        return CompletableFuture.completedFuture(getLastAddConfirmed());
+    }
+
+    @Override
+    public CompletableFuture<Long> tryReadLastAddConfirmedAsync() {
+        return readLastAddConfirmedAsync();
+    }
+
+    @Override
+    public long getLastAddConfirmed() {
+        return entries.get(entries.size() - 1).getEntryId();
+    }
+
+    @Override
+    public long getLength() {
+        long length = 0;
+        for (LedgerEntryImpl entry : entries) {
+            length += entry.getLength();
+        }
+
+        return length;
+    }
+
+    @Override
+    public boolean isClosed() {
+        return metadata.isClosed();
+    }
+
+    @Override
+    public CompletableFuture<LastConfirmedAndEntry> readLastAddConfirmedAndEntryAsync(long entryId,
+                                                                                      long timeOutInMillis,
+                                                                                      boolean parallel) {
+        CompletableFuture<LastConfirmedAndEntry> promise = new CompletableFuture<>();
+        promise.completeExceptionally(new UnsupportedOperationException("Long poll not implemented"));
+        return promise;
+    }
+
+    // Handle interface
+    @Override
+    public long getId() {
+        return ledgerId;
+    }
+
+    @Override
+    public CompletableFuture<Void> closeAsync() {
+        return CompletableFuture.completedFuture(null);
+    }
+
+    @Override
+    public LedgerMetadata getLedgerMetadata() {
+        return metadata;
+    }
+}
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerErrorsTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerErrorsTest.java
index 35eb986..0f0d789 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerErrorsTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerErrorsTest.java
@@ -435,8 +435,8 @@ public class ManagedLedgerErrorsTest extends MockedBookKeeperTestCase {
         ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("recoverLongTimeAfterMultipleWriteErrors");
         ManagedCursor cursor = ledger.openCursor("c1");
 
-        bkc.failNow(BKException.Code.BookieHandleNotAvailableException,
-                BKException.Code.BookieHandleNotAvailableException);
+        bkc.failAfter(0, BKException.Code.BookieHandleNotAvailableException);
+        bkc.failAfter(1, BKException.Code.BookieHandleNotAvailableException);
 
         CountDownLatch counter = new CountDownLatch(2);
         AtomicReference<ManagedLedgerException> ex = new AtomicReference<>();
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/test/MockedBookKeeperTestCase.java b/managed-ledger/src/test/java/org/apache/bookkeeper/test/MockedBookKeeperTestCase.java
index 251a4c8..4cb4903 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/test/MockedBookKeeperTestCase.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/test/MockedBookKeeperTestCase.java
@@ -22,7 +22,7 @@ import java.lang.reflect.Method;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
-import org.apache.bookkeeper.client.MockBookKeeper;
+import org.apache.bookkeeper.client.PulsarMockBookKeeper;
 import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
@@ -49,7 +49,7 @@ public abstract class MockedBookKeeperTestCase {
     protected MockZooKeeper zkc;
 
     // BookKeeper related variables
-    protected MockBookKeeper bkc;
+    protected PulsarMockBookKeeper bkc;
     protected int numBookies;
 
     protected ManagedLedgerFactoryImpl factory;
@@ -121,7 +121,7 @@ public abstract class MockedBookKeeperTestCase {
 
         zkc.create("/ledgers/LAYOUT", "1\nflat:1".getBytes(), null, null);
 
-        bkc = new MockBookKeeper(zkc);
+        bkc = new PulsarMockBookKeeper(zkc, executor.chooseThread(this));
     }
 
     protected void stopBookKeeper() throws Exception {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/MockedBookKeeperClientFactory.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/MockedBookKeeperClientFactory.java
index 1922da7..728bc67 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/MockedBookKeeperClientFactory.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/MockedBookKeeperClientFactory.java
@@ -18,20 +18,33 @@
  */
 package org.apache.pulsar.broker;
 
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
 import java.io.IOException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutorService;
 
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
-import org.apache.bookkeeper.client.MockBookKeeper;
+import org.apache.bookkeeper.client.PulsarMockBookKeeper;
+
 import org.apache.zookeeper.ZooKeeper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class MockedBookKeeperClientFactory implements BookKeeperClientFactory {
+    private static final Logger log = LoggerFactory.getLogger(MockedBookKeeperClientFactory.class);
 
     private final BookKeeper mockedBk;
+    private final ExecutorService executor;
 
     public MockedBookKeeperClientFactory() {
         try {
-            mockedBk = new MockBookKeeper(null);
+            executor = Executors.newSingleThreadExecutor(
+                    new ThreadFactoryBuilder().setNameFormat("mock-bk-client-factory")
+                    .setUncaughtExceptionHandler((thread, ex) -> log.info("Uncaught exception", ex))
+                    .build());
+            mockedBk = new PulsarMockBookKeeper(null, executor);
         } catch (Exception e) {
             throw new RuntimeException(e);
         }
@@ -48,5 +61,6 @@ public class MockedBookKeeperClientFactory implements BookKeeperClientFactory {
             mockedBk.close();
         } catch (BKException | InterruptedException e) {
         }
+        executor.shutdown();
     }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
index bc98efa..5c64ac80 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
@@ -21,19 +21,23 @@ package org.apache.pulsar.broker.auth;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.spy;
 
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
 import java.io.IOException;
 import java.net.URI;
 import java.net.URL;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Predicate;
 import java.util.function.Supplier;
 
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
-import org.apache.bookkeeper.client.MockBookKeeper;
+import org.apache.bookkeeper.client.PulsarMockBookKeeper;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.test.PortManager;
 import org.apache.bookkeeper.util.ZkUtils;
@@ -82,6 +86,7 @@ public abstract class MockedPulsarServiceBaseTest {
     protected final String configClusterName = "test";
 
     private SameThreadOrderedSafeExecutor sameThreadOrderedSafeExecutor;
+    private ExecutorService bkExecutor;
 
     public MockedPulsarServiceBaseTest() {
         resetConfig();
@@ -122,10 +127,14 @@ public abstract class MockedPulsarServiceBaseTest {
     }
 
     protected final void init() throws Exception {
-        mockZookKeeper = createMockZooKeeper();
-        mockBookKeeper = createMockBookKeeper(mockZookKeeper);
-
         sameThreadOrderedSafeExecutor = new SameThreadOrderedSafeExecutor();
+        bkExecutor = Executors.newSingleThreadExecutor(
+                new ThreadFactoryBuilder().setNameFormat("mock-pulsar-bk")
+                .setUncaughtExceptionHandler((thread, ex) -> log.info("Uncaught exception", ex))
+                .build());
+
+        mockZookKeeper = createMockZooKeeper();
+        mockBookKeeper = createMockBookKeeper(mockZookKeeper, bkExecutor);
 
         startBroker();
 
@@ -157,6 +166,9 @@ public abstract class MockedPulsarServiceBaseTest {
             if (sameThreadOrderedSafeExecutor != null) {
                 sameThreadOrderedSafeExecutor.shutdown();
             }
+            if (bkExecutor != null) {
+                bkExecutor.shutdown();
+            }
         } catch (Exception e) {
             log.warn("Failed to clean up mocked pulsar service:", e);
             throw e;
@@ -221,15 +233,16 @@ public abstract class MockedPulsarServiceBaseTest {
         return zk;
     }
 
-    public static NonClosableMockBookKeeper createMockBookKeeper(ZooKeeper zookeeper) throws Exception {
-        return spy(new NonClosableMockBookKeeper(new ClientConfiguration(), zookeeper));
+    public static NonClosableMockBookKeeper createMockBookKeeper(ZooKeeper zookeeper,
+                                                                 ExecutorService executor) throws Exception {
+        return spy(new NonClosableMockBookKeeper(zookeeper, executor));
     }
 
     // Prevent the MockBookKeeper instance from being closed when the broker is restarted within a test
-    public static class NonClosableMockBookKeeper extends MockBookKeeper {
+    public static class NonClosableMockBookKeeper extends PulsarMockBookKeeper {
 
-        public NonClosableMockBookKeeper(ClientConfiguration conf, ZooKeeper zk) throws Exception {
-            super(zk);
+        public NonClosableMockBookKeeper(ZooKeeper zk, ExecutorService executor) throws Exception {
+            super(zk, executor);
         }
 
         @Override
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
index c1be8d1..9db657a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
@@ -113,7 +113,8 @@ public class PersistentDispatcherFailoverConsumerTest {
 
         ZooKeeper mockZk = createMockZooKeeper();
         doReturn(mockZk).when(pulsar).getZkClient();
-        doReturn(createMockBookKeeper(mockZk)).when(pulsar).getBookKeeperClient();
+        doReturn(createMockBookKeeper(mockZk, pulsar.getOrderedExecutor().chooseThread(0)))
+            .when(pulsar).getBookKeeperClient();
 
         configCacheService = mock(ConfigurationCacheService.class);
         @SuppressWarnings("unchecked")
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index 8c175d7..b65b13f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -152,7 +152,8 @@ public class PersistentTopicTest {
 
         ZooKeeper mockZk = createMockZooKeeper();
         doReturn(mockZk).when(pulsar).getZkClient();
-        doReturn(createMockBookKeeper(mockZk)).when(pulsar).getBookKeeperClient();
+        doReturn(createMockBookKeeper(mockZk, pulsar.getOrderedExecutor().chooseThread(0)))
+            .when(pulsar).getBookKeeperClient();
 
         configCacheService = mock(ConfigurationCacheService.class);
         @SuppressWarnings("unchecked")
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index f746571..1145321 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -157,7 +157,8 @@ public class ServerCnxTest {
 
         ZooKeeper mockZk = createMockZooKeeper();
         doReturn(mockZk).when(pulsar).getZkClient();
-        doReturn(createMockBookKeeper(mockZk)).when(pulsar).getBookKeeperClient();
+        doReturn(createMockBookKeeper(mockZk, pulsar.getOrderedExecutor().chooseThread(0)))
+            .when(pulsar).getBookKeeperClient();
 
         configCacheService = mock(ConfigurationCacheService.class);
         ZooKeeperDataCache<Policies> zkDataCache = mock(ZooKeeperDataCache.class);
diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java
index eb88d37..8fe170d 100644
--- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java
+++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java
@@ -43,7 +43,7 @@ import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.client.LedgerMetadata;
-import org.apache.bookkeeper.client.MockBookKeeper;
+import org.apache.bookkeeper.client.PulsarMockBookKeeper;
 import org.apache.bookkeeper.client.api.DigestType;
 import org.apache.bookkeeper.client.api.LedgerEntries;
 import org.apache.bookkeeper.client.api.LedgerEntry;
@@ -83,11 +83,11 @@ class BlobStoreManagedLedgerOffloaderTest extends BlobStoreTestBase {
     private static final int DEFAULT_BLOCK_SIZE = 5*1024*1024;
     private static final int DEFAULT_READ_BUFFER_SIZE = 1*1024*1024;
     final OrderedScheduler scheduler;
-    final MockBookKeeper bk;
+    final PulsarMockBookKeeper bk;
 
     BlobStoreManagedLedgerOffloaderTest() throws Exception {
         scheduler = OrderedScheduler.newSchedulerBuilder().numThreads(1).name("offloader").build();
-        bk = new MockBookKeeper(createMockZooKeeper());
+        bk = new PulsarMockBookKeeper(createMockZooKeeper(), scheduler.chooseThread(this));
     }
 
     private ReadHandle buildReadHandle() throws Exception {