You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/08/03 04:37:53 UTC
[pulsar] branch master updated: [fix][broker] PulsarLedgerManager to pass correct error code to BK client (#16857)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 2e8bd3d7b17 [fix][broker] PulsarLedgerManager to pass correct error code to BK client (#16857)
2e8bd3d7b17 is described below
commit 2e8bd3d7b17190d6fb45d5b35eff598948975385
Author: Andrey Yegorov <86...@users.noreply.github.com>
AuthorDate: Tue Aug 2 21:37:46 2022 -0700
[fix][broker] PulsarLedgerManager to pass correct error code to BK client (#16857)
---
.../mledger/impl/ManagedLedgerFactoryImpl.java | 29 +++-
.../broker/service/BrokerBkEnsemblesTests.java | 150 ++++++++++++++++++++-
.../metadata/bookkeeper/PulsarLedgerManager.java | 65 +++++++--
3 files changed, 231 insertions(+), 13 deletions(-)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
index 629e96ba3e3..eddf97558c2 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
@@ -876,7 +876,19 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
DeleteLedgerCallback callback, Object ctx) {
Futures.waitForAll(info.ledgers.stream()
.filter(li -> !li.isOffloaded)
- .map(li -> bkc.newDeleteLedgerOp().withLedgerId(li.ledgerId).execute())
+ .map(li -> bkc.newDeleteLedgerOp().withLedgerId(li.ledgerId).execute()
+ .handle((result, ex) -> {
+ if (ex != null) {
+ int rc = BKException.getExceptionCode(ex);
+ if (rc == BKException.Code.NoSuchLedgerExistsOnMetadataServerException
+ || rc == BKException.Code.NoSuchLedgerExistsException) {
+ log.info("Ledger {} does not exist, ignoring", li.ledgerId);
+ return null;
+ }
+ throw new CompletionException(ex);
+ }
+ return result;
+ }))
.collect(Collectors.toList()))
.thenRun(() -> {
// Delete the metadata
@@ -904,7 +916,20 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
// Delete the cursor ledger if present
if (cursor.cursorsLedgerId != -1) {
- cursorLedgerDeleteFuture = bkc.newDeleteLedgerOp().withLedgerId(cursor.cursorsLedgerId).execute();
+ cursorLedgerDeleteFuture = bkc.newDeleteLedgerOp().withLedgerId(cursor.cursorsLedgerId)
+ .execute()
+ .handle((result, ex) -> {
+ if (ex != null) {
+ int rc = BKException.getExceptionCode(ex);
+ if (rc == BKException.Code.NoSuchLedgerExistsOnMetadataServerException
+ || rc == BKException.Code.NoSuchLedgerExistsException) {
+ log.info("Ledger {} does not exist, ignoring", cursor.cursorsLedgerId);
+ return null;
+ }
+ throw new CompletionException(ex);
+ }
+ return result;
+ });
} else {
cursorLedgerDeleteFuture = CompletableFuture.completedFuture(null);
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
index 8db3734eabe..612d9368b8c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.broker.service;
import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.fail;
import java.lang.reflect.Field;
@@ -31,9 +32,13 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import com.google.common.collect.Sets;
import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.api.LedgerMetadata;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
@@ -47,12 +52,14 @@ import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.zookeeper.ZooKeeper;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.Test;
@Test(groups = "broker")
+@Slf4j
public class BrokerBkEnsemblesTests extends BkEnsemblesTestBase {
public BrokerBkEnsemblesTests() {
@@ -238,7 +245,7 @@ public class BrokerBkEnsemblesTests extends BkEnsemblesTestBase {
try {
bookKeeper.deleteLedger(entry.getKey());
} catch (Exception e) {
- e.printStackTrace();
+ log.warn("failed to delete ledger {}", entry.getKey(), e);
}
}
});
@@ -279,6 +286,147 @@ public class BrokerBkEnsemblesTests extends BkEnsemblesTestBase {
consumer.close();
}
+ @Test
+ public void testTruncateCorruptDataLedger() throws Exception {
+ // Ensure intended state for autoSkipNonRecoverableData
+ admin.brokers().updateDynamicConfiguration("autoSkipNonRecoverableData", "false");
+
+ @Cleanup
+ PulsarClient client = PulsarClient.builder()
+ .serviceUrl(pulsar.getWebServiceAddress())
+ .statsInterval(0, TimeUnit.SECONDS)
+ .build();
+
+ final int totalMessages = 100;
+ final int totalDataLedgers = 5;
+ final int entriesPerLedger = totalMessages / totalDataLedgers;
+
+ final String tenant = "prop";
+ try {
+ admin.tenants().createTenant(tenant, new TenantInfoImpl(Sets.newHashSet("role1", "role2"),
+ Sets.newHashSet(config.getClusterName())));
+ } catch (Exception e) {
+
+ }
+ final String ns1 = tenant + "/crash-broker";
+ try {
+ admin.namespaces().createNamespace(ns1, Sets.newHashSet(config.getClusterName()));
+ } catch (Exception e) {
+
+ }
+
+ final String topic1 = "persistent://" + ns1 + "/my-topic-" + System.currentTimeMillis();
+
+ // Create subscription
+ Consumer<byte[]> consumer = client.newConsumer().topic(topic1).subscriptionName("my-subscriber-name")
+ .receiverQueueSize(5).subscribe();
+
+ PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topic1).get();
+ ManagedLedgerImpl ml = (ManagedLedgerImpl) topic.getManagedLedger();
+ ManagedCursorImpl cursor = (ManagedCursorImpl) ml.getCursors().iterator().next();
+ Field configField = ManagedCursorImpl.class.getDeclaredField("config");
+ configField.setAccessible(true);
+ // Create multiple data-ledger
+ ManagedLedgerConfig config = (ManagedLedgerConfig) configField.get(cursor);
+ config.setMaxEntriesPerLedger(entriesPerLedger);
+ config.setMinimumRolloverTime(1, TimeUnit.MILLISECONDS);
+ // bookkeeper client
+ Field bookKeeperField = ManagedLedgerImpl.class.getDeclaredField("bookKeeper");
+ bookKeeperField.setAccessible(true);
+ // Create multiple data-ledger
+ BookKeeper bookKeeper = (BookKeeper) bookKeeperField.get(ml);
+
+ // (1) publish messages in 10 data-ledgers each with 20 entries under managed-ledger
+ Producer<byte[]> producer = client.newProducer().topic(topic1).create();
+ for (int i = 0; i < totalMessages; i++) {
+ String message = "my-message-" + i;
+ producer.send(message.getBytes());
+ }
+
+ // validate: consumer is able to consume msg and close consumer after reading 1 entry
+ Assert.assertNotNull(consumer.receive(1, TimeUnit.SECONDS));
+ consumer.close();
+
+ NavigableMap<Long, LedgerInfo> ledgerInfo = ml.getLedgersInfo();
+ Assert.assertEquals(ledgerInfo.size(), totalDataLedgers);
+ Entry<Long, LedgerInfo> lastLedger = ledgerInfo.lastEntry();
+ long firstLedgerToDelete = lastLedger.getKey();
+
+ // (2) delete first 4 data-ledgers
+ ledgerInfo.entrySet().forEach(entry -> {
+ if (!entry.equals(lastLedger)) {
+ assertEquals(entry.getValue().getEntries(), entriesPerLedger);
+ try {
+ bookKeeper.deleteLedger(entry.getKey());
+ } catch (Exception e) {
+ log.warn("failed to delete ledger {}", entry.getKey(), e);
+ }
+ }
+ });
+
+ // create 5 more ledgers
+ for (int i = 0; i < totalMessages; i++) {
+ String message = "my-message2-" + i;
+ producer.send(message.getBytes());
+ }
+
+ ml.delete();
+
+ // Admin should be able to truncate the topic
+ admin.topics().truncate(topic1);
+
+ ledgerInfo.entrySet().forEach(entry -> {
+ log.warn("found ledger: {}", entry.getKey());
+ assertNotEquals(firstLedgerToDelete, entry.getKey());
+ });
+
+ // Currently, ledger deletion is async and failed deletion
+ // does not actually fail truncation but logs an exception
+ // and creates scheduled task to retry
+ Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> {
+ LedgerMetadata meta = bookKeeper
+ .getLedgerMetadata(firstLedgerToDelete)
+ .exceptionally(e -> null)
+ .get();
+ assertEquals(null, meta, "ledger should be deleted " + firstLedgerToDelete);
+ });
+
+ // Should not throw, deleting absent ledger must be a noop
+ // unless PulsarManager returned a wrong error which
+ // got translated to BKUnexpectedConditionException
+ try {
+ bookKeeper.deleteLedger(firstLedgerToDelete);
+ } catch (BKException.BKNoSuchLedgerExistsOnMetadataServerException bke) {
+ // pass
+ }
+
+ producer.close();
+ consumer.close();
+ }
+
+ @Test
+ public void testDeleteLedgerFactoryCorruptLedger() throws Exception {
+ ManagedLedgerFactoryImpl factory = (ManagedLedgerFactoryImpl) pulsar.getManagedLedgerFactory();
+ ManagedLedgerImpl ml = (ManagedLedgerImpl) factory.open("test");
+
+ // bookkeeper client
+ Field bookKeeperField = ManagedLedgerImpl.class.getDeclaredField("bookKeeper");
+ bookKeeperField.setAccessible(true);
+ // Create multiple data-ledger
+ BookKeeper bookKeeper = (BookKeeper) bookKeeperField.get(ml);
+
+ ml.addEntry("dummy-entry-1".getBytes());
+
+ NavigableMap<Long, LedgerInfo> ledgerInfo = ml.getLedgersInfo();
+ long lastLedger = ledgerInfo.lastEntry().getKey();
+
+ ml.close();
+ bookKeeper.deleteLedger(lastLedger);
+
+ // BK ledger is deleted, factory should not throw on delete
+ factory.delete("test");
+ }
+
@Test(timeOut = 20000)
public void testTopicWithWildCardChar() throws Exception {
@Cleanup
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerManager.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerManager.java
index 0d4f7a6e114..cf6699a5b30 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerManager.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerManager.java
@@ -24,8 +24,10 @@ import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -89,6 +91,32 @@ public class PulsarLedgerManager implements LedgerManager {
store.registerListener(this::handleDataNotification);
}
+ private static Throwable mapToBkException(Throwable ex) {
+ if (ex instanceof CompletionException || ex instanceof ExecutionException) {
+ return mapToBkException(ex.getCause());
+ }
+
+ if (ex instanceof MetadataStoreException.NotFoundException) {
+ BKException bke = BKException.create(BKException.Code.NoSuchLedgerExistsOnMetadataServerException);
+ bke.initCause(ex);
+ return bke;
+ } else if (ex instanceof MetadataStoreException.AlreadyExistsException) {
+ BKException bke = BKException.create(BKException.Code.LedgerExistException);
+ bke.initCause(ex);
+ return bke;
+ } else if (ex instanceof MetadataStoreException.BadVersionException) {
+ BKException bke = BKException.create(BKException.Code.MetadataVersionException);
+ bke.initCause(ex);
+ return bke;
+ } else if (ex instanceof MetadataStoreException.AlreadyClosedException) {
+ BKException bke = BKException.create(BKException.Code.LedgerClosedException);
+ bke.initCause(ex);
+ return bke;
+ }
+
+ return ex;
+ }
+
@Override
public CompletableFuture<Versioned<LedgerMetadata>> createLedgerMetadata(long ledgerId,
LedgerMetadata inputMetadata) {
@@ -106,14 +134,21 @@ public class PulsarLedgerManager implements LedgerManager {
return FutureUtil.failedFuture(new BKException.BKMetadataSerializationException(ioe));
}
- CompletableFuture<Versioned<LedgerMetadata>> future = store.put(getLedgerPath(ledgerId), data, Optional.of(-1L))
- .thenApply(stat -> new Versioned(metadata, new LongVersion(stat.getVersion())));
- future.exceptionally(ex -> {
- log.error("Failed to create ledger {}: {}", ledgerId, ex.getMessage());
- return null;
- });
+ CompletableFuture<Versioned<LedgerMetadata>> promise = new CompletableFuture<>();
- return future;
+ store.put(getLedgerPath(ledgerId), data, Optional.of(-1L))
+ .whenComplete((stat, ex) -> {
+ if (ex != null) {
+ log.error("Failed to create ledger {}: {}", ledgerId, ex.getMessage());
+ promise.completeExceptionally(mapToBkException(ex));
+ return;
+ }
+
+ Versioned<LedgerMetadata> result = new Versioned(metadata, new LongVersion(stat.getVersion()));
+ promise.complete(result);
+ });
+
+ return promise;
}
@Override
@@ -131,9 +166,17 @@ public class PulsarLedgerManager implements LedgerManager {
}
}
- return store.delete(getLedgerPath(ledgerId), existingVersion)
- .thenRun(() -> {
- // removed listener on ledgerId
+ CompletableFuture<Void> promise = new CompletableFuture<>();
+ store.delete(getLedgerPath(ledgerId), existingVersion)
+ .whenComplete((result, ex) -> {
+ if (ex != null) {
+ log.error("Failed to remove ledger metadata {}: {}", ledgerId, ex.getMessage());
+ promise.completeExceptionally(mapToBkException(ex));
+ return;
+ }
+
+ promise.complete(result);
+ // remove listener on ledgerId
Set<BookkeeperInternalCallbacks.LedgerMetadataListener> listenerSet = listeners.remove(ledgerId);
if (null != listenerSet) {
if (log.isDebugEnabled()) {
@@ -148,6 +191,8 @@ public class PulsarLedgerManager implements LedgerManager {
}
}
});
+
+ return promise;
}
@Override