You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2021/09/21 03:34:41 UTC
[pulsar] branch master updated: ManagedLedger should not attempt
deferrable metadata operation while disconnected (#12101)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 4bc3c40 ManagedLedger should not attempt deferrable metadata operation while disconnected (#12101)
4bc3c40 is described below
commit 4bc3c405a565b1c756b9b70ff02a63ea06c32c0d
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Mon Sep 20 20:33:20 2021 -0700
ManagedLedger should not attempt deferrable metadata operation while disconnected (#12101)
* ManagedLedger should not attempt deferrable metadata operation while disconnected
* Fixed compilation
* Remove unused imports
* Fixed ManagedLedgerWriter
* Fixed compilation in presto connector
* Also defer ledger trimmings
---
.../bookkeeper/mledger/impl/ManagedCursorImpl.java | 3 +-
.../mledger/impl/ManagedLedgerFactoryImpl.java | 43 +++++++++++++----
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 11 +++++
.../bookkeeper/mledger/impl/ManagedCursorTest.java | 39 +++++++++++++++
.../mledger/impl/ManagedLedgerErrorsTest.java | 9 ++--
.../impl/ManagedLedgerFactoryShutdownTest.java | 4 +-
.../bookkeeper/mledger/impl/ManagedLedgerTest.java | 56 ++++++++++++++++++++++
.../bookkeeper/test/BookKeeperClusterTestCase.java | 3 +-
.../bookkeeper/test/MockedBookKeeperTestCase.java | 3 +-
.../pulsar/PulsarClusterMetadataTeardown.java | 3 +-
.../pulsar/broker/ManagedLedgerClientFactory.java | 4 +-
.../broker/storage/ManagedLedgerStorage.java | 6 +--
.../metadata/impl/FaultInjectionMetadataStore.java | 32 +++++++++++--
.../bookkeeper/test/BookKeeperClusterTestCase.java | 37 +++++++-------
.../bookkeeper/test/MockedBookKeeperTestCase.java | 13 +++--
.../pulsar/sql/presto/PulsarConnectorCache.java | 7 ++-
.../pulsar/testclient/ManagedLedgerWriter.java | 9 +---
.../coordinator/test/MockedBookKeeperTestCase.java | 18 +++----
18 files changed, 221 insertions(+), 79 deletions(-)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 82b4cfc..516c2af 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -2653,7 +2653,8 @@ public class ManagedCursorImpl implements ManagedCursor {
boolean shouldCloseLedger(LedgerHandle lh) {
long now = clock.millis();
- if ((lh.getLastAddConfirmed() >= config.getMetadataMaxEntriesPerLedger()
+ if (ledger.factory.isMetadataServiceAvailable() &&
+ (lh.getLastAddConfirmed() >= config.getMetadataMaxEntriesPerLedger()
|| lastLedgerSwitchTimestamp < (now - config.getLedgerRolloverTimeout() * 1000))
&& (STATE_UPDATER.get(this) != State.Closed && STATE_UPDATER.get(this) != State.Closing)) {
// It's safe to modify the timestamp since this method will be only called from a callback, implying that
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
index 389a43b..59be069 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
@@ -38,6 +38,7 @@ import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Collectors;
+import lombok.Getter;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.common.util.OrderedScheduler;
@@ -77,6 +78,8 @@ import org.apache.pulsar.common.util.DateFormatter;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.Stat;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
+import org.apache.pulsar.metadata.api.extended.SessionEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -106,6 +109,12 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
//indicate whether shutdown() is called.
private volatile boolean closed;
+ /**
+ * Keep a flag to indicate whether we're currently connected to the metadata service
+ */
+ @Getter
+ private boolean metadataServiceAvailable;
+
private static class PendingInitializeManagedLedger {
private final ManagedLedgerImpl ledger;
@@ -118,31 +127,31 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
}
- public ManagedLedgerFactoryImpl(MetadataStore metadataStore, ClientConfiguration bkClientConfiguration)
+ public ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore, ClientConfiguration bkClientConfiguration)
throws Exception {
this(metadataStore, bkClientConfiguration, new ManagedLedgerFactoryConfig());
}
@SuppressWarnings("deprecation")
- public ManagedLedgerFactoryImpl(MetadataStore metadataStore, ClientConfiguration bkClientConfiguration,
+ public ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore, ClientConfiguration bkClientConfiguration,
ManagedLedgerFactoryConfig config)
throws Exception {
this(metadataStore, new DefaultBkFactory(bkClientConfiguration),
true /* isBookkeeperManaged */, config, NullStatsLogger.INSTANCE);
}
- public ManagedLedgerFactoryImpl(MetadataStore metadataStore, BookKeeper bookKeeper)
+ public ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore, BookKeeper bookKeeper)
throws Exception {
this(metadataStore, bookKeeper, new ManagedLedgerFactoryConfig());
}
- public ManagedLedgerFactoryImpl(MetadataStore metadataStore, BookKeeper bookKeeper,
+ public ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore, BookKeeper bookKeeper,
ManagedLedgerFactoryConfig config)
throws Exception {
this(metadataStore, (policyConfig) -> bookKeeper, config);
}
- public ManagedLedgerFactoryImpl(MetadataStore metadataStore,
+ public ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore,
BookkeeperFactoryForCustomEnsemblePlacementPolicy bookKeeperGroupFactory,
ManagedLedgerFactoryConfig config)
throws Exception {
@@ -150,7 +159,7 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
config, NullStatsLogger.INSTANCE);
}
- public ManagedLedgerFactoryImpl(MetadataStore metadataStore,
+ public ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore,
BookkeeperFactoryForCustomEnsemblePlacementPolicy bookKeeperGroupFactory,
ManagedLedgerFactoryConfig config, StatsLogger statsLogger)
throws Exception {
@@ -158,7 +167,7 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
config, statsLogger);
}
- private ManagedLedgerFactoryImpl(MetadataStore metadataStore,
+ private ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore,
BookkeeperFactoryForCustomEnsemblePlacementPolicy bookKeeperGroupFactory,
boolean isBookkeeperManaged,
ManagedLedgerFactoryConfig config, StatsLogger statsLogger) throws Exception {
@@ -170,7 +179,7 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
.build();
cacheEvictionExecutor = Executors
.newSingleThreadExecutor(new DefaultThreadFactory("bookkeeper-ml-cache-eviction"));
-
+ this.metadataServiceAvailable = true;
this.bookkeeperFactory = bookKeeperGroupFactory;
this.isBookkeeperManaged = isBookkeeperManaged;
this.metadataStore = metadataStore;
@@ -190,6 +199,8 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
cacheEvictionExecutor.execute(this::cacheEvictionTask);
closed = false;
+
+ metadataStore.registerSessionListener(this::handleMetadataStoreNotification);
}
static class DefaultBkFactory implements BookkeeperFactoryForCustomEnsemblePlacementPolicy {
@@ -207,6 +218,22 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
}
}
+ private synchronized void handleMetadataStoreNotification(SessionEvent e) {
+ log.info("Received MetadataStore session event: {}", e);
+
+ switch (e) {
+ case ConnectionLost:
+ case SessionLost:
+ metadataServiceAvailable = false;
+ break;
+
+ case Reconnected:
+ case SessionReestablished:
+ metadataServiceAvailable = true;
+ break;
+ }
+ }
+
private synchronized void flushCursors() {
ledgers.values().forEach(mlfuture -> {
if (mlfuture.isDone() && !mlfuture.isCompletedExceptionally()) {
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 2559e62..4b62943 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -2328,6 +2328,12 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
}
void internalTrimLedgers(boolean isTruncate, CompletableFuture<?> promise) {
+ if (!factory.isMetadataServiceAvailable()) {
+ // Defer trimming of ledger if we cannot connect to metadata service
+ promise.complete(null);
+ return;
+ }
+
// Ensure only one trimming operation is active
if (!trimmerMutex.tryLock()) {
scheduleDeferredTrimming(isTruncate, promise);
@@ -3386,6 +3392,11 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
}
private boolean currentLedgerIsFull() {
+ if (!factory.isMetadataServiceAvailable()) {
+ // We don't want to trigger metadata operations if we already know that we're currently disconnected
+ return false;
+ }
+
boolean spaceQuotaReached = (currentLedgerEntries >= config.getMaxEntriesPerLedger()
|| currentLedgerSize >= (config.getMaxSizePerLedgerMb() * MegaByte));
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index 1b8d7a5..4bfcf94 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -87,6 +87,7 @@ import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.PositionInfo;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
+import org.apache.pulsar.metadata.api.extended.SessionEvent;
import org.apache.pulsar.metadata.impl.FaultInjectionMetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.Stat;
@@ -3552,5 +3553,43 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase {
Assert.assertEquals(managedCursor.getNumberOfEntriesInBacklog(false), 4);
}
+ @Test
+ public void testCursorNoRolloverIfNoMetadataSession() throws Exception {
+ ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
+ managedLedgerConfig.setMaxEntriesPerLedger(2);
+ managedLedgerConfig.setMetadataMaxEntriesPerLedger(2);
+ managedLedgerConfig.setMinimumRolloverTime(0, TimeUnit.MILLISECONDS);
+ managedLedgerConfig.setThrottleMarkDelete(0);
+ ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("testCursorNoRolloverIfNoMetadataSession", managedLedgerConfig);
+ ManagedCursorImpl cursor = (ManagedCursorImpl) ledger.openCursor("test");
+
+ List<Position> positions = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ positions.add(ledger.addEntry("test".getBytes(Encoding)));
+ }
+
+ cursor.delete(positions.get(0));
+
+ long initialLedgerId = cursor.getCursorLedger();
+
+ metadataStore.triggerSessionEvent(SessionEvent.SessionLost);
+
+ for (int i = 1; i < 10; i++) {
+ cursor.delete(positions.get(i));
+ }
+
+ assertEquals(cursor.getCursorLedger(), initialLedgerId);
+
+ // After the session gets reestablished, the rollover should restart
+ metadataStore.triggerSessionEvent(SessionEvent.SessionReestablished);
+
+ for (int i = 0; i < 10; i++) {
+ Position p = ledger.addEntry("test".getBytes(Encoding));
+ cursor.delete(p);
+ }
+
+ assertNotEquals(cursor.getCursorLedger(), initialLedgerId);
+ }
+
private static final Logger log = LoggerFactory.getLogger(ManagedCursorTest.class);
}
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerErrorsTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerErrorsTest.java
index 7db48db..3c09a2a 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerErrorsTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerErrorsTest.java
@@ -20,17 +20,14 @@ package org.apache.bookkeeper.mledger.impl;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
-import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
-
+import io.netty.buffer.ByteBuf;
import java.util.List;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
-
-import io.netty.buffer.ByteBuf;
import lombok.Cleanup;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.api.DigestType;
@@ -45,8 +42,8 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerFencedE
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
-import org.apache.pulsar.metadata.impl.FaultInjectionMetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreException;
+import org.apache.pulsar.metadata.impl.FaultInjectionMetadataStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.Test;
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryShutdownTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryShutdownTest.java
index a8b8169..8d1d4ea 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryShutdownTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryShutdownTest.java
@@ -43,8 +43,8 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ReadOnlyCursor;
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.pulsar.metadata.api.GetResult;
-import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.Stat;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
@@ -59,7 +59,7 @@ public class ManagedLedgerFactoryShutdownTest {
final long version = 0;
final long createTimeMillis = System.currentTimeMillis();
- MetadataStore metadataStore = mock(MetadataStore.class);
+ MetadataStoreExtended metadataStore = mock(MetadataStoreExtended.class);
CountDownLatch slowZk = new CountDownLatch(1);
given(metadataStore.get(any())).willAnswer(inv -> {
String path = inv.getArgument(0, String.class);
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index fc14a3e..f8776fd 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -110,6 +110,7 @@ import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.Ledge
import org.apache.bookkeeper.mledger.util.Futures;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
+import org.apache.pulsar.metadata.api.extended.SessionEvent;
import org.apache.pulsar.metadata.impl.FaultInjectionMetadataStore;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.commons.lang3.mutable.MutableObject;
@@ -1781,6 +1782,61 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
}
@Test
+ public void testNoRolloverIfNoMetadataSession() throws Exception {
+ ManagedLedgerConfig conf = new ManagedLedgerConfig();
+ conf.setMaxEntriesPerLedger(1);
+ conf.setMinimumRolloverTime(0, TimeUnit.SECONDS);
+ ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("testNoRolloverIfNoMetadataSession", conf);
+ ledger.openCursor("c1");
+
+ metadataStore.triggerSessionEvent(SessionEvent.SessionLost);
+
+ for (int i = 1; i < 10; i++) {
+ ledger.addEntry("data".getBytes());
+ }
+
+ // This should not have changed
+ assertEquals(ledger.getLedgersInfoAsList().size(), 1);
+
+ metadataStore.triggerSessionEvent(SessionEvent.SessionReestablished);
+ ledger.addEntry("data".getBytes());
+ ledger.addEntry("data".getBytes());
+ ledger.addEntry("data".getBytes());
+
+ // After the re-establishment, we'll be creating new ledgers
+ assertEquals(ledger.getLedgersInfoAsList().size(), 3);
+ }
+
+ @Test
+ public void testNoRolloverIfNoMetadataSessionWithExistingData() throws Exception {
+ ManagedLedgerConfig conf = new ManagedLedgerConfig();
+ conf.setMaxEntriesPerLedger(2);
+ conf.setMinimumRolloverTime(0, TimeUnit.SECONDS);
+ ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("testNoRolloverIfNoMetadataSession", conf);
+ ledger.openCursor("c1");
+
+ ledger.addEntry("data".getBytes());
+
+ assertEquals(ledger.getLedgersInfoAsList().size(), 1);
+
+ metadataStore.triggerSessionEvent(SessionEvent.SessionLost);
+
+ for (int i = 1; i < 10; i++) {
+ ledger.addEntry("data".getBytes());
+ }
+
+ // This should not have changed
+ assertEquals(ledger.getLedgersInfoAsList().size(), 1);
+
+ metadataStore.triggerSessionEvent(SessionEvent.SessionReestablished);
+ ledger.addEntry("data".getBytes());
+ ledger.addEntry("data".getBytes());
+
+ // After the re-establishment, we'll be creating new ledgers
+ assertEquals(ledger.getLedgersInfoAsList().size(), 2);
+ }
+
+ @Test
public void testRetention() throws Exception {
@Cleanup("shutdown")
ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(metadataStore, bkc);
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java b/managed-ledger/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
index e8bd07c..006c287 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
@@ -55,6 +55,7 @@ import org.apache.bookkeeper.replication.AutoRecoveryMain;
import org.apache.commons.io.FileUtils;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.MetadataStoreFactory;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.metadata.impl.FaultInjectionMetadataStore;
import org.apache.zookeeper.KeeperException;
import org.awaitility.Awaitility;
@@ -140,7 +141,7 @@ public abstract class BookKeeperClusterTestCase {
protected void startZKCluster(String path) throws Exception {
zkUtil.startServer(path);
metadataStore = new FaultInjectionMetadataStore(
- MetadataStoreFactory.create(zkUtil.getZooKeeperConnectString(),
+ MetadataStoreExtended.create(zkUtil.getZooKeeperConnectString(),
MetadataStoreConfig.builder().build()));
}
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/test/MockedBookKeeperTestCase.java b/managed-ledger/src/test/java/org/apache/bookkeeper/test/MockedBookKeeperTestCase.java
index a936165..4324105 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/test/MockedBookKeeperTestCase.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/test/MockedBookKeeperTestCase.java
@@ -29,6 +29,7 @@ import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.MetadataStoreFactory;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.metadata.impl.FaultInjectionMetadataStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -68,7 +69,7 @@ public abstract class MockedBookKeeperTestCase {
public final void setUp(Method method) throws Exception {
LOG.info(">>>>>> starting {}", method);
metadataStore = new FaultInjectionMetadataStore(
- MetadataStoreFactory.create("memory://local", MetadataStoreConfig.builder().build()));
+ MetadataStoreExtended.create("memory://local", MetadataStoreConfig.builder().build()));
try {
// start bookkeeper service
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataTeardown.java b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataTeardown.java
index 012a755..d0da6a5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataTeardown.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataTeardown.java
@@ -38,6 +38,7 @@ import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.MetadataStoreFactory;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -97,7 +98,7 @@ public class PulsarClusterMetadataTeardown {
}
@Cleanup
- MetadataStore metadataStore = MetadataStoreFactory.create(arguments.zookeeper,
+ MetadataStoreExtended metadataStore = MetadataStoreExtended.create(arguments.zookeeper,
MetadataStoreConfig.builder().sessionTimeoutMillis(arguments.zkSessionTimeoutMillis).build());
if (arguments.bkMetadataServiceUri != null) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
index 52572cf..b615628 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
@@ -38,7 +38,7 @@ import org.apache.commons.configuration.Configuration;
import org.apache.pulsar.broker.stats.prometheus.metrics.PrometheusMetricsProvider;
import org.apache.pulsar.broker.storage.ManagedLedgerStorage;
import org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig;
-import org.apache.pulsar.metadata.api.MetadataStore;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -53,7 +53,7 @@ public class ManagedLedgerClientFactory implements ManagedLedgerStorage {
bkEnsemblePolicyToBkClientMap = Maps.newConcurrentMap();
private StatsProvider statsProvider = new NullStatsProvider();
- public void initialize(ServiceConfiguration conf, MetadataStore metadataStore,
+ public void initialize(ServiceConfiguration conf, MetadataStoreExtended metadataStore,
ZooKeeper zkClient,
BookKeeperClientFactory bookkeeperProvider,
EventLoopGroup eventLoopGroup) throws Exception {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/storage/ManagedLedgerStorage.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/storage/ManagedLedgerStorage.java
index cc154b6..87e1ba6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/storage/ManagedLedgerStorage.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/storage/ManagedLedgerStorage.java
@@ -27,7 +27,7 @@ import org.apache.pulsar.broker.BookKeeperClientFactory;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.common.classification.InterfaceAudience.Private;
import org.apache.pulsar.common.classification.InterfaceStability.Unstable;
-import org.apache.pulsar.metadata.api.MetadataStore;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.zookeeper.ZooKeeper;
/**
@@ -46,7 +46,7 @@ public interface ManagedLedgerStorage extends AutoCloseable {
* @throws Exception
*/
void initialize(ServiceConfiguration conf,
- MetadataStore metadataStore,
+ MetadataStoreExtended metadataStore,
ZooKeeper zkClient,
BookKeeperClientFactory bookkeeperProvider,
EventLoopGroup eventLoopGroup) throws Exception;
@@ -88,7 +88,7 @@ public interface ManagedLedgerStorage extends AutoCloseable {
* @return the initialized managed ledger storage.
*/
static ManagedLedgerStorage create(ServiceConfiguration conf,
- MetadataStore metadataStore,
+ MetadataStoreExtended metadataStore,
ZooKeeper zkClient,
BookKeeperClientFactory bkProvider,
EventLoopGroup eventLoopGroup) throws Exception {
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/FaultInjectionMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/FaultInjectionMetadataStore.java
index 6236aed..65731c0 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/FaultInjectionMetadataStore.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/FaultInjectionMetadataStore.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.metadata.impl;
import com.fasterxml.jackson.core.type.TypeReference;
+import java.util.EnumSet;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
@@ -35,15 +36,19 @@ import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.metadata.api.Stat;
+import org.apache.pulsar.metadata.api.extended.CreateOption;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
+import org.apache.pulsar.metadata.api.extended.SessionEvent;
/**
* Add possibility to inject failures during tests that interact with MetadataStore.
*/
-public class FaultInjectionMetadataStore implements MetadataStore {
+public class FaultInjectionMetadataStore implements MetadataStoreExtended {
- private final MetadataStore store;
+ private final MetadataStoreExtended store;
private final AtomicReference<MetadataStoreException> alwaysFail;
private final CopyOnWriteArrayList<Failure> failures;
+ private final List<Consumer<SessionEvent>> sessionListeners = new CopyOnWriteArrayList<>();
public enum OperationType {
GET,
@@ -59,7 +64,7 @@ public class FaultInjectionMetadataStore implements MetadataStore {
private final BiPredicate<OperationType, String> predicate;
}
- public FaultInjectionMetadataStore(MetadataStore store) {
+ public FaultInjectionMetadataStore(MetadataStoreExtended store) {
this.store = store;
this.failures = new CopyOnWriteArrayList<>();
this.alwaysFail = new AtomicReference<>();
@@ -106,6 +111,17 @@ public class FaultInjectionMetadataStore implements MetadataStore {
}
@Override
+ public CompletableFuture<Stat> put(String path, byte[] value, Optional<Long> expectedVersion,
+ EnumSet<CreateOption> options) {
+ Optional<MetadataStoreException> ex = programmedFailure(OperationType.PUT, path);
+ if (ex.isPresent()) {
+ return FutureUtil.failedFuture(ex.get());
+ }
+
+ return store.put(path, value, expectedVersion, options);
+ }
+
+ @Override
public CompletableFuture<Void> delete(String path, Optional<Long> expectedVersion) {
Optional<MetadataStoreException> ex = programmedFailure(OperationType.DELETE, path);
if (ex.isPresent()) {
@@ -146,6 +162,12 @@ public class FaultInjectionMetadataStore implements MetadataStore {
}
@Override
+ public void registerSessionListener(Consumer<SessionEvent> listener) {
+ store.registerSessionListener(listener);
+ sessionListeners.add(listener);
+ }
+
+ @Override
public void close() throws Exception {
store.close();
}
@@ -162,6 +184,10 @@ public class FaultInjectionMetadataStore implements MetadataStore {
this.alwaysFail.set(null);
}
+ public void triggerSessionEvent(SessionEvent event) {
+ sessionListeners.forEach(l -> l.accept(event));
+ }
+
private Optional<MetadataStoreException> programmedFailure(OperationType op, String path) {
MetadataStoreException ex = this.alwaysFail.get();
if (ex != null) {
diff --git a/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/BookKeeperClusterTestCase.java b/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/BookKeeperClusterTestCase.java
index 4eeca7e..193858a 100644
--- a/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/BookKeeperClusterTestCase.java
+++ b/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/BookKeeperClusterTestCase.java
@@ -24,6 +24,19 @@
package org.apache.pulsar.packages.management.storage.bookkeeper.bookkeeper.test;
import io.netty.buffer.ByteBufAllocator;
+import java.io.File;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import org.apache.bookkeeper.bookie.Bookie;
import org.apache.bookkeeper.bookie.BookieException;
import org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorage;
@@ -36,35 +49,17 @@ import org.apache.bookkeeper.metastore.InMemoryMetaStore;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookieServer;
import org.apache.bookkeeper.replication.AutoRecoveryMain;
+import org.apache.bookkeeper.test.ZooKeeperUtil;
import org.apache.commons.io.FileUtils;
-import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
-import org.apache.pulsar.metadata.api.MetadataStoreFactory;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.metadata.impl.FaultInjectionMetadataStore;
-import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.ZooDefs;
-import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
-import java.io.File;
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import org.apache.bookkeeper.test.ZooKeeperUtil;
-
/**
* A class runs several bookie servers for testing.
*/
@@ -107,7 +102,7 @@ public abstract class BookKeeperClusterTestCase {
// start zookeeper service
startZKCluster();
- metadataStore = new FaultInjectionMetadataStore(MetadataStoreFactory.create(
+ metadataStore = new FaultInjectionMetadataStore(MetadataStoreExtended.create(
zkUtil.getZooKeeperConnectString(),
MetadataStoreConfig.builder().build()));
diff --git a/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/MockedBookKeeperTestCase.java b/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/MockedBookKeeperTestCase.java
index 90a2184..9186999 100644
--- a/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/MockedBookKeeperTestCase.java
+++ b/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/MockedBookKeeperTestCase.java
@@ -18,6 +18,10 @@
*/
package org.apache.pulsar.packages.management.storage.bookkeeper.bookkeeper.test;
+import java.lang.reflect.Method;
+import java.util.Optional;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import org.apache.bookkeeper.client.PulsarMockBookKeeper;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.conf.ClientConfiguration;
@@ -25,7 +29,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.MetadataStoreException;
-import org.apache.pulsar.metadata.api.MetadataStoreFactory;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.metadata.impl.FaultInjectionMetadataStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -34,11 +38,6 @@ import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
-import java.lang.reflect.Method;
-import java.util.Optional;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
/**
* A class runs several bookie servers for testing.
*/
@@ -71,7 +70,7 @@ public abstract class MockedBookKeeperTestCase {
@BeforeMethod(alwaysRun = true)
public void setUp(Method method) throws Exception {
LOG.info(">>>>>> starting {}", method);
- metadataStore = new FaultInjectionMetadataStore(MetadataStoreFactory.create("memory://local",
+ metadataStore = new FaultInjectionMetadataStore(MetadataStoreExtended.create("memory://local",
MetadataStoreConfig.builder().build()));
try {
// start bookkeeper service
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java
index 4577099..bf823de 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java
@@ -42,9 +42,8 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.PulsarVersion;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
-import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
-import org.apache.pulsar.metadata.api.MetadataStoreFactory;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
/**
* Implementation of a cache for the Pulsar connector.
@@ -56,7 +55,7 @@ public class PulsarConnectorCache {
@VisibleForTesting
static PulsarConnectorCache instance;
- private final MetadataStore metadataStore;
+ private final MetadataStoreExtended metadataStore;
private final ManagedLedgerFactory managedLedgerFactory;
private final StatsProvider statsProvider;
@@ -71,7 +70,7 @@ public class PulsarConnectorCache {
private PulsarConnectorCache(PulsarConnectorConfig pulsarConnectorConfig) throws Exception {
- this.metadataStore = MetadataStoreFactory.create(pulsarConnectorConfig.getZookeeperUri(),
+ this.metadataStore = MetadataStoreExtended.create(pulsarConnectorConfig.getZookeeperUri(),
MetadataStoreConfig.builder().build());
this.managedLedgerFactory = initManagedLedgerFactory(pulsarConnectorConfig);
this.statsProvider = PulsarConnectorUtils.createInstance(pulsarConnectorConfig.getStatsProvider(),
diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/ManagedLedgerWriter.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/ManagedLedgerWriter.java
index 617e11a..464c59e 100644
--- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/ManagedLedgerWriter.java
+++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/ManagedLedgerWriter.java
@@ -19,7 +19,6 @@
package org.apache.pulsar.testclient;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
-
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParameterException;
@@ -27,10 +26,8 @@ import com.beust.jcommander.Parameters;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
import com.google.common.util.concurrent.RateLimiter;
-
import io.netty.buffer.ByteBuf;
import io.netty.util.concurrent.DefaultThreadFactory;
-
import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.Collections;
@@ -46,7 +43,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.LongAdder;
import java.util.stream.Collectors;
-
import lombok.Cleanup;
import org.HdrHistogram.Histogram;
import org.HdrHistogram.Recorder;
@@ -63,9 +59,8 @@ import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
-import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
-import org.apache.pulsar.metadata.api.MetadataStoreFactory;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.testclient.utils.PaddingDecimalFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -177,7 +172,7 @@ public class ManagedLedgerWriter {
mlFactoryConf.setMaxCacheSize(0);
@Cleanup
- MetadataStore metadataStore = MetadataStoreFactory.create(arguments.zookeeperServers,
+ MetadataStoreExtended metadataStore = MetadataStoreExtended.create(arguments.zookeeperServers,
MetadataStoreConfig.builder().build());
ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(metadataStore, bkConf, mlFactoryConf);
diff --git a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/test/MockedBookKeeperTestCase.java b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/test/MockedBookKeeperTestCase.java
index 9a94857..8edb2ff 100644
--- a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/test/MockedBookKeeperTestCase.java
+++ b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/test/MockedBookKeeperTestCase.java
@@ -18,20 +18,19 @@
*/
package org.apache.pulsar.transaction.coordinator.test;
+import java.lang.reflect.Method;
+import java.util.Optional;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import org.apache.bookkeeper.client.PulsarMockBookKeeper;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
-import org.apache.bookkeeper.util.ZkUtils;
-import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.MetadataStoreException;
-import org.apache.pulsar.metadata.api.MetadataStoreFactory;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.metadata.impl.FaultInjectionMetadataStore;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.MockZooKeeper;
-import org.apache.zookeeper.ZooDefs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterClass;
@@ -39,11 +38,6 @@ import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
-import java.lang.reflect.Method;
-import java.util.Optional;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
/**
* A class runs several bookie servers for testing.
*/
@@ -76,7 +70,7 @@ public abstract class MockedBookKeeperTestCase {
@BeforeMethod(alwaysRun = true)
public void setUp(Method method) throws Exception {
LOG.info(">>>>>> starting {}", method);
- metadataStore = new FaultInjectionMetadataStore(MetadataStoreFactory.create("memory://local",
+ metadataStore = new FaultInjectionMetadataStore(MetadataStoreExtended.create("memory://local",
MetadataStoreConfig.builder().build()));
try {
// start bookkeeper service