You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2021/09/21 03:34:41 UTC

[pulsar] branch master updated: ManagedLedger should not attempt deferrable metadata operation while disconnected (#12101)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 4bc3c40  ManagedLedger should not attempt deferrable metadata operation while disconnected (#12101)
4bc3c40 is described below

commit 4bc3c405a565b1c756b9b70ff02a63ea06c32c0d
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Mon Sep 20 20:33:20 2021 -0700

    ManagedLedger should not attempt deferrable metadata operation while disconnected (#12101)
    
    * ManagedLedger should not attempt deferrable metadata operation while disconnected
    
    * Fixed compilation
    
    * Remove unused imports
    
    * Fixed ManagedLedgerWriter
    
    * Fixed compilation in presto connector
    
    * Also defer ledger trimmings
---
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java |  3 +-
 .../mledger/impl/ManagedLedgerFactoryImpl.java     | 43 +++++++++++++----
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 11 +++++
 .../bookkeeper/mledger/impl/ManagedCursorTest.java | 39 +++++++++++++++
 .../mledger/impl/ManagedLedgerErrorsTest.java      |  9 ++--
 .../impl/ManagedLedgerFactoryShutdownTest.java     |  4 +-
 .../bookkeeper/mledger/impl/ManagedLedgerTest.java | 56 ++++++++++++++++++++++
 .../bookkeeper/test/BookKeeperClusterTestCase.java |  3 +-
 .../bookkeeper/test/MockedBookKeeperTestCase.java  |  3 +-
 .../pulsar/PulsarClusterMetadataTeardown.java      |  3 +-
 .../pulsar/broker/ManagedLedgerClientFactory.java  |  4 +-
 .../broker/storage/ManagedLedgerStorage.java       |  6 +--
 .../metadata/impl/FaultInjectionMetadataStore.java | 32 +++++++++++--
 .../bookkeeper/test/BookKeeperClusterTestCase.java | 37 +++++++-------
 .../bookkeeper/test/MockedBookKeeperTestCase.java  | 13 +++--
 .../pulsar/sql/presto/PulsarConnectorCache.java    |  7 ++-
 .../pulsar/testclient/ManagedLedgerWriter.java     |  9 +---
 .../coordinator/test/MockedBookKeeperTestCase.java | 18 +++----
 18 files changed, 221 insertions(+), 79 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 82b4cfc..516c2af 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -2653,7 +2653,8 @@ public class ManagedCursorImpl implements ManagedCursor {
 
     boolean shouldCloseLedger(LedgerHandle lh) {
         long now = clock.millis();
-        if ((lh.getLastAddConfirmed() >= config.getMetadataMaxEntriesPerLedger()
+        if (ledger.factory.isMetadataServiceAvailable() &&
+                (lh.getLastAddConfirmed() >= config.getMetadataMaxEntriesPerLedger()
                 || lastLedgerSwitchTimestamp < (now - config.getLedgerRolloverTimeout() * 1000))
                 && (STATE_UPDATER.get(this) != State.Closed && STATE_UPDATER.get(this) != State.Closing)) {
             // It's safe to modify the timestamp since this method will be only called from a callback, implying that
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
index 389a43b..59be069 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
@@ -38,6 +38,7 @@ import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
+import lombok.Getter;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.common.util.OrderedScheduler;
@@ -77,6 +78,8 @@ import org.apache.pulsar.common.util.DateFormatter;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.metadata.api.MetadataStore;
 import org.apache.pulsar.metadata.api.Stat;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
+import org.apache.pulsar.metadata.api.extended.SessionEvent;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -106,6 +109,12 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
     //indicate whether shutdown() is called.
     private volatile boolean closed;
 
+    /**
+     * Keep a flag to indicate whether we're currently connected to the metadata service
+     */
+    @Getter
+    private boolean metadataServiceAvailable;
+
     private static class PendingInitializeManagedLedger {
 
         private final ManagedLedgerImpl ledger;
@@ -118,31 +127,31 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
 
     }
 
-    public ManagedLedgerFactoryImpl(MetadataStore metadataStore, ClientConfiguration bkClientConfiguration)
+    public ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore, ClientConfiguration bkClientConfiguration)
             throws Exception {
         this(metadataStore, bkClientConfiguration, new ManagedLedgerFactoryConfig());
     }
 
     @SuppressWarnings("deprecation")
-    public ManagedLedgerFactoryImpl(MetadataStore metadataStore, ClientConfiguration bkClientConfiguration,
+    public ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore, ClientConfiguration bkClientConfiguration,
                                     ManagedLedgerFactoryConfig config)
             throws Exception {
         this(metadataStore, new DefaultBkFactory(bkClientConfiguration),
                 true /* isBookkeeperManaged */, config, NullStatsLogger.INSTANCE);
     }
 
-    public ManagedLedgerFactoryImpl(MetadataStore metadataStore, BookKeeper bookKeeper)
+    public ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore, BookKeeper bookKeeper)
             throws Exception {
         this(metadataStore, bookKeeper, new ManagedLedgerFactoryConfig());
     }
 
-    public ManagedLedgerFactoryImpl(MetadataStore metadataStore, BookKeeper bookKeeper,
+    public ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore, BookKeeper bookKeeper,
                                     ManagedLedgerFactoryConfig config)
             throws Exception {
         this(metadataStore, (policyConfig) -> bookKeeper, config);
     }
 
-    public ManagedLedgerFactoryImpl(MetadataStore metadataStore,
+    public ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore,
                                     BookkeeperFactoryForCustomEnsemblePlacementPolicy bookKeeperGroupFactory,
                                     ManagedLedgerFactoryConfig config)
             throws Exception {
@@ -150,7 +159,7 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
                 config, NullStatsLogger.INSTANCE);
     }
 
-    public ManagedLedgerFactoryImpl(MetadataStore metadataStore,
+    public ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore,
                                     BookkeeperFactoryForCustomEnsemblePlacementPolicy bookKeeperGroupFactory,
                                     ManagedLedgerFactoryConfig config, StatsLogger statsLogger)
             throws Exception {
@@ -158,7 +167,7 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
                 config, statsLogger);
     }
 
-    private ManagedLedgerFactoryImpl(MetadataStore metadataStore,
+    private ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore,
                                      BookkeeperFactoryForCustomEnsemblePlacementPolicy bookKeeperGroupFactory,
                                      boolean isBookkeeperManaged,
                                      ManagedLedgerFactoryConfig config, StatsLogger statsLogger) throws Exception {
@@ -170,7 +179,7 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
                 .build();
         cacheEvictionExecutor = Executors
                 .newSingleThreadExecutor(new DefaultThreadFactory("bookkeeper-ml-cache-eviction"));
-
+        this.metadataServiceAvailable = true;
         this.bookkeeperFactory = bookKeeperGroupFactory;
         this.isBookkeeperManaged = isBookkeeperManaged;
         this.metadataStore = metadataStore;
@@ -190,6 +199,8 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
 
         cacheEvictionExecutor.execute(this::cacheEvictionTask);
         closed = false;
+
+        metadataStore.registerSessionListener(this::handleMetadataStoreNotification);
     }
 
     static class DefaultBkFactory implements BookkeeperFactoryForCustomEnsemblePlacementPolicy {
@@ -207,6 +218,22 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
         }
     }
 
+    private synchronized void handleMetadataStoreNotification(SessionEvent e) {
+        log.info("Received MetadataStore session event: {}", e);
+
+        switch (e) {
+            case ConnectionLost:
+            case SessionLost:
+                metadataServiceAvailable = false;
+                break;
+
+            case Reconnected:
+            case SessionReestablished:
+                metadataServiceAvailable = true;
+                break;
+        }
+    }
+
     private synchronized void flushCursors() {
         ledgers.values().forEach(mlfuture -> {
             if (mlfuture.isDone() && !mlfuture.isCompletedExceptionally()) {
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 2559e62..4b62943 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -2328,6 +2328,12 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
     }
 
     void internalTrimLedgers(boolean isTruncate, CompletableFuture<?> promise) {
+        if (!factory.isMetadataServiceAvailable()) {
+            // Defer trimming of ledger if we cannot connect to metadata service
+            promise.complete(null);
+            return;
+        }
+
         // Ensure only one trimming operation is active
         if (!trimmerMutex.tryLock()) {
             scheduleDeferredTrimming(isTruncate, promise);
@@ -3386,6 +3392,11 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
     }
 
     private boolean currentLedgerIsFull() {
+        if (!factory.isMetadataServiceAvailable()) {
+            // We don't want to trigger metadata operations if we already know that we're currently disconnected
+            return false;
+        }
+
         boolean spaceQuotaReached = (currentLedgerEntries >= config.getMaxEntriesPerLedger()
                 || currentLedgerSize >= (config.getMaxSizePerLedgerMb() * MegaByte));
 
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index 1b8d7a5..4bfcf94 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -87,6 +87,7 @@ import org.apache.bookkeeper.mledger.proto.MLDataFormats;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats.PositionInfo;
 import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
+import org.apache.pulsar.metadata.api.extended.SessionEvent;
 import org.apache.pulsar.metadata.impl.FaultInjectionMetadataStore;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.apache.pulsar.metadata.api.Stat;
@@ -3552,5 +3553,43 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase {
         Assert.assertEquals(managedCursor.getNumberOfEntriesInBacklog(false), 4);
     }
 
+    @Test
+    public void testCursorNoRolloverIfNoMetadataSession() throws Exception {
+        ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
+        managedLedgerConfig.setMaxEntriesPerLedger(2);
+        managedLedgerConfig.setMetadataMaxEntriesPerLedger(2);
+        managedLedgerConfig.setMinimumRolloverTime(0, TimeUnit.MILLISECONDS);
+        managedLedgerConfig.setThrottleMarkDelete(0);
+        ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("testCursorNoRolloverIfNoMetadataSession", managedLedgerConfig);
+        ManagedCursorImpl cursor = (ManagedCursorImpl) ledger.openCursor("test");
+
+        List<Position> positions = new ArrayList<>();
+        for (int i = 0; i < 10; i++) {
+            positions.add(ledger.addEntry("test".getBytes(Encoding)));
+        }
+
+        cursor.delete(positions.get(0));
+
+        long initialLedgerId = cursor.getCursorLedger();
+
+        metadataStore.triggerSessionEvent(SessionEvent.SessionLost);
+
+        for (int i = 1; i < 10; i++) {
+            cursor.delete(positions.get(i));
+        }
+
+        assertEquals(cursor.getCursorLedger(), initialLedgerId);
+
+        // After the session gets reestablished, the rollover should restart
+        metadataStore.triggerSessionEvent(SessionEvent.SessionReestablished);
+
+        for (int i = 0; i < 10; i++) {
+            Position p = ledger.addEntry("test".getBytes(Encoding));
+            cursor.delete(p);
+        }
+
+        assertNotEquals(cursor.getCursorLedger(), initialLedgerId);
+    }
+
     private static final Logger log = LoggerFactory.getLogger(ManagedCursorTest.class);
 }
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerErrorsTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerErrorsTest.java
index 7db48db..3c09a2a 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerErrorsTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerErrorsTest.java
@@ -20,17 +20,14 @@ package org.apache.bookkeeper.mledger.impl;
 
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
-import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
-
+import io.netty.buffer.ByteBuf;
 import java.util.List;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicReference;
-
-import io.netty.buffer.ByteBuf;
 import lombok.Cleanup;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.api.DigestType;
@@ -45,8 +42,8 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerFencedE
 import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
-import org.apache.pulsar.metadata.impl.FaultInjectionMetadataStore;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
+import org.apache.pulsar.metadata.impl.FaultInjectionMetadataStore;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.annotations.Test;
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryShutdownTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryShutdownTest.java
index a8b8169..8d1d4ea 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryShutdownTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryShutdownTest.java
@@ -43,8 +43,8 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.ReadOnlyCursor;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats;
 import org.apache.pulsar.metadata.api.GetResult;
-import org.apache.pulsar.metadata.api.MetadataStore;
 import org.apache.pulsar.metadata.api.Stat;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
@@ -59,7 +59,7 @@ public class ManagedLedgerFactoryShutdownTest {
         final long version = 0;
         final long createTimeMillis = System.currentTimeMillis();
 
-        MetadataStore metadataStore = mock(MetadataStore.class);
+        MetadataStoreExtended metadataStore = mock(MetadataStoreExtended.class);
         CountDownLatch slowZk = new CountDownLatch(1);
         given(metadataStore.get(any())).willAnswer(inv -> {
             String path = inv.getArgument(0, String.class);
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index fc14a3e..f8776fd 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -110,6 +110,7 @@ import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.Ledge
 import org.apache.bookkeeper.mledger.util.Futures;
 import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
 import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
+import org.apache.pulsar.metadata.api.extended.SessionEvent;
 import org.apache.pulsar.metadata.impl.FaultInjectionMetadataStore;
 import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.commons.lang3.mutable.MutableObject;
@@ -1781,6 +1782,61 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
     }
 
     @Test
+    public void testNoRolloverIfNoMetadataSession() throws Exception {
+        ManagedLedgerConfig conf = new ManagedLedgerConfig();
+        conf.setMaxEntriesPerLedger(1);
+        conf.setMinimumRolloverTime(0, TimeUnit.SECONDS);
+        ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("testNoRolloverIfNoMetadataSession", conf);
+        ledger.openCursor("c1");
+
+        metadataStore.triggerSessionEvent(SessionEvent.SessionLost);
+
+        for (int i = 1; i < 10; i++) {
+            ledger.addEntry("data".getBytes());
+        }
+
+        // This should not have changed
+        assertEquals(ledger.getLedgersInfoAsList().size(), 1);
+
+        metadataStore.triggerSessionEvent(SessionEvent.SessionReestablished);
+        ledger.addEntry("data".getBytes());
+        ledger.addEntry("data".getBytes());
+        ledger.addEntry("data".getBytes());
+
+        // After the re-establishment, we'll be creating new ledgers
+        assertEquals(ledger.getLedgersInfoAsList().size(), 3);
+    }
+
+    @Test
+    public void testNoRolloverIfNoMetadataSessionWithExistingData() throws Exception {
+        ManagedLedgerConfig conf = new ManagedLedgerConfig();
+        conf.setMaxEntriesPerLedger(2);
+        conf.setMinimumRolloverTime(0, TimeUnit.SECONDS);
+        ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("testNoRolloverIfNoMetadataSession", conf);
+        ledger.openCursor("c1");
+
+        ledger.addEntry("data".getBytes());
+
+        assertEquals(ledger.getLedgersInfoAsList().size(), 1);
+
+        metadataStore.triggerSessionEvent(SessionEvent.SessionLost);
+
+        for (int i = 1; i < 10; i++) {
+            ledger.addEntry("data".getBytes());
+        }
+
+        // This should not have changed
+        assertEquals(ledger.getLedgersInfoAsList().size(), 1);
+
+        metadataStore.triggerSessionEvent(SessionEvent.SessionReestablished);
+        ledger.addEntry("data".getBytes());
+        ledger.addEntry("data".getBytes());
+
+        // After the re-establishment, we'll be creating new ledgers
+        assertEquals(ledger.getLedgersInfoAsList().size(), 2);
+    }
+
+    @Test
     public void testRetention() throws Exception {
         @Cleanup("shutdown")
         ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(metadataStore, bkc);
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java b/managed-ledger/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
index e8bd07c..006c287 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
@@ -55,6 +55,7 @@ import org.apache.bookkeeper.replication.AutoRecoveryMain;
 import org.apache.commons.io.FileUtils;
 import org.apache.pulsar.metadata.api.MetadataStoreConfig;
 import org.apache.pulsar.metadata.api.MetadataStoreFactory;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
 import org.apache.pulsar.metadata.impl.FaultInjectionMetadataStore;
 import org.apache.zookeeper.KeeperException;
 import org.awaitility.Awaitility;
@@ -140,7 +141,7 @@ public abstract class BookKeeperClusterTestCase {
     protected void startZKCluster(String path) throws Exception {
         zkUtil.startServer(path);
         metadataStore = new FaultInjectionMetadataStore(
-                MetadataStoreFactory.create(zkUtil.getZooKeeperConnectString(),
+                MetadataStoreExtended.create(zkUtil.getZooKeeperConnectString(),
                 MetadataStoreConfig.builder().build()));
     }
 
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/test/MockedBookKeeperTestCase.java b/managed-ledger/src/test/java/org/apache/bookkeeper/test/MockedBookKeeperTestCase.java
index a936165..4324105 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/test/MockedBookKeeperTestCase.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/test/MockedBookKeeperTestCase.java
@@ -29,6 +29,7 @@ import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
 import org.apache.pulsar.metadata.api.MetadataStoreConfig;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.apache.pulsar.metadata.api.MetadataStoreFactory;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
 import org.apache.pulsar.metadata.impl.FaultInjectionMetadataStore;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -68,7 +69,7 @@ public abstract class MockedBookKeeperTestCase {
     public final void setUp(Method method) throws Exception {
         LOG.info(">>>>>> starting {}", method);
         metadataStore = new FaultInjectionMetadataStore(
-                MetadataStoreFactory.create("memory://local", MetadataStoreConfig.builder().build()));
+                MetadataStoreExtended.create("memory://local", MetadataStoreConfig.builder().build()));
 
         try {
             // start bookkeeper service
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataTeardown.java b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataTeardown.java
index 012a755..d0da6a5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataTeardown.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataTeardown.java
@@ -38,6 +38,7 @@ import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.metadata.api.MetadataStore;
 import org.apache.pulsar.metadata.api.MetadataStoreConfig;
 import org.apache.pulsar.metadata.api.MetadataStoreFactory;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -97,7 +98,7 @@ public class PulsarClusterMetadataTeardown {
         }
 
         @Cleanup
-        MetadataStore metadataStore = MetadataStoreFactory.create(arguments.zookeeper,
+        MetadataStoreExtended metadataStore = MetadataStoreExtended.create(arguments.zookeeper,
                 MetadataStoreConfig.builder().sessionTimeoutMillis(arguments.zkSessionTimeoutMillis).build());
 
         if (arguments.bkMetadataServiceUri != null) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
index 52572cf..b615628 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
@@ -38,7 +38,7 @@ import org.apache.commons.configuration.Configuration;
 import org.apache.pulsar.broker.stats.prometheus.metrics.PrometheusMetricsProvider;
 import org.apache.pulsar.broker.storage.ManagedLedgerStorage;
 import org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig;
-import org.apache.pulsar.metadata.api.MetadataStore;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
 import org.apache.zookeeper.ZooKeeper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -53,7 +53,7 @@ public class ManagedLedgerClientFactory implements ManagedLedgerStorage {
             bkEnsemblePolicyToBkClientMap = Maps.newConcurrentMap();
     private StatsProvider statsProvider = new NullStatsProvider();
 
-    public void initialize(ServiceConfiguration conf, MetadataStore metadataStore,
+    public void initialize(ServiceConfiguration conf, MetadataStoreExtended metadataStore,
                            ZooKeeper zkClient,
                            BookKeeperClientFactory bookkeeperProvider,
                            EventLoopGroup eventLoopGroup) throws Exception {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/storage/ManagedLedgerStorage.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/storage/ManagedLedgerStorage.java
index cc154b6..87e1ba6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/storage/ManagedLedgerStorage.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/storage/ManagedLedgerStorage.java
@@ -27,7 +27,7 @@ import org.apache.pulsar.broker.BookKeeperClientFactory;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.common.classification.InterfaceAudience.Private;
 import org.apache.pulsar.common.classification.InterfaceStability.Unstable;
-import org.apache.pulsar.metadata.api.MetadataStore;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
 import org.apache.zookeeper.ZooKeeper;
 
 /**
@@ -46,7 +46,7 @@ public interface ManagedLedgerStorage extends AutoCloseable {
      * @throws Exception
      */
     void initialize(ServiceConfiguration conf,
-                    MetadataStore metadataStore,
+                    MetadataStoreExtended metadataStore,
                     ZooKeeper zkClient,
                     BookKeeperClientFactory bookkeeperProvider,
                     EventLoopGroup eventLoopGroup) throws Exception;
@@ -88,7 +88,7 @@ public interface ManagedLedgerStorage extends AutoCloseable {
      * @return the initialized managed ledger storage.
      */
     static ManagedLedgerStorage create(ServiceConfiguration conf,
-                                       MetadataStore metadataStore,
+                                       MetadataStoreExtended metadataStore,
                                        ZooKeeper zkClient,
                                        BookKeeperClientFactory bkProvider,
                                        EventLoopGroup eventLoopGroup) throws Exception {
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/FaultInjectionMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/FaultInjectionMetadataStore.java
index 6236aed..65731c0 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/FaultInjectionMetadataStore.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/FaultInjectionMetadataStore.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.metadata.impl;
 
 import com.fasterxml.jackson.core.type.TypeReference;
+import java.util.EnumSet;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
@@ -35,15 +36,19 @@ import org.apache.pulsar.metadata.api.MetadataStore;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.apache.pulsar.metadata.api.Notification;
 import org.apache.pulsar.metadata.api.Stat;
+import org.apache.pulsar.metadata.api.extended.CreateOption;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
+import org.apache.pulsar.metadata.api.extended.SessionEvent;
 
 /**
  * Add possibility to inject failures during tests that interact with MetadataStore.
  */
-public class FaultInjectionMetadataStore implements MetadataStore {
+public class FaultInjectionMetadataStore implements MetadataStoreExtended {
 
-    private final MetadataStore store;
+    private final MetadataStoreExtended store;
     private final AtomicReference<MetadataStoreException> alwaysFail;
     private final CopyOnWriteArrayList<Failure> failures;
+    private final List<Consumer<SessionEvent>> sessionListeners = new CopyOnWriteArrayList<>();
 
     public enum OperationType {
         GET,
@@ -59,7 +64,7 @@ public class FaultInjectionMetadataStore implements MetadataStore {
         private final BiPredicate<OperationType, String> predicate;
     }
 
-    public FaultInjectionMetadataStore(MetadataStore store) {
+    public FaultInjectionMetadataStore(MetadataStoreExtended store) {
         this.store = store;
         this.failures = new CopyOnWriteArrayList<>();
         this.alwaysFail = new AtomicReference<>();
@@ -106,6 +111,17 @@ public class FaultInjectionMetadataStore implements MetadataStore {
     }
 
     @Override
+    public CompletableFuture<Stat> put(String path, byte[] value, Optional<Long> expectedVersion,
+                                       EnumSet<CreateOption> options) {
+        Optional<MetadataStoreException> ex = programmedFailure(OperationType.PUT, path);
+        if (ex.isPresent()) {
+            return FutureUtil.failedFuture(ex.get());
+        }
+
+        return store.put(path, value, expectedVersion, options);
+    }
+
+    @Override
     public CompletableFuture<Void> delete(String path, Optional<Long> expectedVersion) {
         Optional<MetadataStoreException> ex = programmedFailure(OperationType.DELETE, path);
         if (ex.isPresent()) {
@@ -146,6 +162,12 @@ public class FaultInjectionMetadataStore implements MetadataStore {
     }
 
     @Override
+    public void registerSessionListener(Consumer<SessionEvent> listener) {
+        store.registerSessionListener(listener);
+        sessionListeners.add(listener);
+    }
+
+    @Override
     public void close() throws Exception {
         store.close();
     }
@@ -162,6 +184,10 @@ public class FaultInjectionMetadataStore implements MetadataStore {
         this.alwaysFail.set(null);
     }
 
+    public void triggerSessionEvent(SessionEvent event) {
+        sessionListeners.forEach(l -> l.accept(event));
+    }
+
     private Optional<MetadataStoreException> programmedFailure(OperationType op, String path) {
         MetadataStoreException ex = this.alwaysFail.get();
         if (ex != null) {
diff --git a/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/BookKeeperClusterTestCase.java b/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/BookKeeperClusterTestCase.java
index 4eeca7e..193858a 100644
--- a/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/BookKeeperClusterTestCase.java
+++ b/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/BookKeeperClusterTestCase.java
@@ -24,6 +24,19 @@
 package org.apache.pulsar.packages.management.storage.bookkeeper.bookkeeper.test;
 
 import io.netty.buffer.ByteBufAllocator;
+import java.io.File;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import org.apache.bookkeeper.bookie.Bookie;
 import org.apache.bookkeeper.bookie.BookieException;
 import org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorage;
@@ -36,35 +49,17 @@ import org.apache.bookkeeper.metastore.InMemoryMetaStore;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookieServer;
 import org.apache.bookkeeper.replication.AutoRecoveryMain;
+import org.apache.bookkeeper.test.ZooKeeperUtil;
 import org.apache.commons.io.FileUtils;
-import org.apache.pulsar.metadata.api.MetadataStore;
 import org.apache.pulsar.metadata.api.MetadataStoreConfig;
-import org.apache.pulsar.metadata.api.MetadataStoreFactory;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
 import org.apache.pulsar.metadata.impl.FaultInjectionMetadataStore;
-import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.ZooDefs;
-import org.apache.zookeeper.ZooKeeper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 
-import java.io.File;
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import org.apache.bookkeeper.test.ZooKeeperUtil;
-
 /**
  * A class runs several bookie servers for testing.
  */
@@ -107,7 +102,7 @@ public abstract class BookKeeperClusterTestCase {
             // start zookeeper service
             startZKCluster();
 
-            metadataStore = new FaultInjectionMetadataStore(MetadataStoreFactory.create(
+            metadataStore = new FaultInjectionMetadataStore(MetadataStoreExtended.create(
                     zkUtil.getZooKeeperConnectString(),
                     MetadataStoreConfig.builder().build()));
 
diff --git a/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/MockedBookKeeperTestCase.java b/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/MockedBookKeeperTestCase.java
index 90a2184..9186999 100644
--- a/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/MockedBookKeeperTestCase.java
+++ b/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/MockedBookKeeperTestCase.java
@@ -18,6 +18,10 @@
  */
 package org.apache.pulsar.packages.management.storage.bookkeeper.bookkeeper.test;
 
+import java.lang.reflect.Method;
+import java.util.Optional;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import org.apache.bookkeeper.client.PulsarMockBookKeeper;
 import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.bookkeeper.conf.ClientConfiguration;
@@ -25,7 +29,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
 import org.apache.pulsar.metadata.api.MetadataStoreConfig;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
-import org.apache.pulsar.metadata.api.MetadataStoreFactory;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
 import org.apache.pulsar.metadata.impl.FaultInjectionMetadataStore;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -34,11 +38,6 @@ import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.BeforeMethod;
 
-import java.lang.reflect.Method;
-import java.util.Optional;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
 /**
  * A class runs several bookie servers for testing.
  */
@@ -71,7 +70,7 @@ public abstract class MockedBookKeeperTestCase {
     @BeforeMethod(alwaysRun = true)
     public void setUp(Method method) throws Exception {
         LOG.info(">>>>>> starting {}", method);
-        metadataStore = new FaultInjectionMetadataStore(MetadataStoreFactory.create("memory://local",
+        metadataStore = new FaultInjectionMetadataStore(MetadataStoreExtended.create("memory://local",
                 MetadataStoreConfig.builder().build()));
         try {
             // start bookkeeper service
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java
index 4577099..bf823de 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java
@@ -42,9 +42,8 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.PulsarVersion;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
-import org.apache.pulsar.metadata.api.MetadataStore;
 import org.apache.pulsar.metadata.api.MetadataStoreConfig;
-import org.apache.pulsar.metadata.api.MetadataStoreFactory;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
 
 /**
  * Implementation of a cache for the Pulsar connector.
@@ -56,7 +55,7 @@ public class PulsarConnectorCache {
     @VisibleForTesting
     static PulsarConnectorCache instance;
 
-    private final MetadataStore metadataStore;
+    private final MetadataStoreExtended metadataStore;
     private final ManagedLedgerFactory managedLedgerFactory;
 
     private final StatsProvider statsProvider;
@@ -71,7 +70,7 @@ public class PulsarConnectorCache {
 
 
     private PulsarConnectorCache(PulsarConnectorConfig pulsarConnectorConfig) throws Exception {
-        this.metadataStore = MetadataStoreFactory.create(pulsarConnectorConfig.getZookeeperUri(),
+        this.metadataStore = MetadataStoreExtended.create(pulsarConnectorConfig.getZookeeperUri(),
                 MetadataStoreConfig.builder().build());
         this.managedLedgerFactory = initManagedLedgerFactory(pulsarConnectorConfig);
         this.statsProvider = PulsarConnectorUtils.createInstance(pulsarConnectorConfig.getStatsProvider(),
diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/ManagedLedgerWriter.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/ManagedLedgerWriter.java
index 617e11a..464c59e 100644
--- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/ManagedLedgerWriter.java
+++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/ManagedLedgerWriter.java
@@ -19,7 +19,6 @@
 package org.apache.pulsar.testclient;
 
 import static java.util.concurrent.TimeUnit.NANOSECONDS;
-
 import com.beust.jcommander.JCommander;
 import com.beust.jcommander.Parameter;
 import com.beust.jcommander.ParameterException;
@@ -27,10 +26,8 @@ import com.beust.jcommander.Parameters;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.ObjectWriter;
 import com.google.common.util.concurrent.RateLimiter;
-
 import io.netty.buffer.ByteBuf;
 import io.netty.util.concurrent.DefaultThreadFactory;
-
 import java.text.DecimalFormat;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -46,7 +43,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.LongAdder;
 import java.util.stream.Collectors;
-
 import lombok.Cleanup;
 import org.HdrHistogram.Histogram;
 import org.HdrHistogram.Recorder;
@@ -63,9 +59,8 @@ import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
 import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
-import org.apache.pulsar.metadata.api.MetadataStore;
 import org.apache.pulsar.metadata.api.MetadataStoreConfig;
-import org.apache.pulsar.metadata.api.MetadataStoreFactory;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
 import org.apache.pulsar.testclient.utils.PaddingDecimalFormat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -177,7 +172,7 @@ public class ManagedLedgerWriter {
         mlFactoryConf.setMaxCacheSize(0);
 
         @Cleanup
-        MetadataStore metadataStore = MetadataStoreFactory.create(arguments.zookeeperServers,
+        MetadataStoreExtended metadataStore = MetadataStoreExtended.create(arguments.zookeeperServers,
                 MetadataStoreConfig.builder().build());
         ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(metadataStore, bkConf, mlFactoryConf);
 
diff --git a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/test/MockedBookKeeperTestCase.java b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/test/MockedBookKeeperTestCase.java
index 9a94857..8edb2ff 100644
--- a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/test/MockedBookKeeperTestCase.java
+++ b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/test/MockedBookKeeperTestCase.java
@@ -18,20 +18,19 @@
  */
 package org.apache.pulsar.transaction.coordinator.test;
 
+import java.lang.reflect.Method;
+import java.util.Optional;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import org.apache.bookkeeper.client.PulsarMockBookKeeper;
 import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
-import org.apache.bookkeeper.util.ZkUtils;
-import org.apache.pulsar.metadata.api.MetadataStore;
 import org.apache.pulsar.metadata.api.MetadataStoreConfig;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
-import org.apache.pulsar.metadata.api.MetadataStoreFactory;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
 import org.apache.pulsar.metadata.impl.FaultInjectionMetadataStore;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.MockZooKeeper;
-import org.apache.zookeeper.ZooDefs;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.annotations.AfterClass;
@@ -39,11 +38,6 @@ import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.BeforeMethod;
 
-import java.lang.reflect.Method;
-import java.util.Optional;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
 /**
  * A class runs several bookie servers for testing.
  */
@@ -76,7 +70,7 @@ public abstract class MockedBookKeeperTestCase {
     @BeforeMethod(alwaysRun = true)
     public void setUp(Method method) throws Exception {
         LOG.info(">>>>>> starting {}", method);
-        metadataStore = new FaultInjectionMetadataStore(MetadataStoreFactory.create("memory://local",
+        metadataStore = new FaultInjectionMetadataStore(MetadataStoreExtended.create("memory://local",
                 MetadataStoreConfig.builder().build()));
         try {
             // start bookkeeper service