You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/08/08 09:04:31 UTC

[pulsar] 27/33: [fix][broker] PulsarLedgerManager to pass correct error code to BK client (#16857)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 796dd629e3cd3ec35a71fad2da3db71328798089
Author: Andrey Yegorov <86...@users.noreply.github.com>
AuthorDate: Tue Aug 2 21:37:46 2022 -0700

    [fix][broker] PulsarLedgerManager to pass correct error code to BK client (#16857)
    
    (cherry picked from commit 2e8bd3d7b17190d6fb45d5b35eff598948975385)
---
 .../mledger/impl/ManagedLedgerFactoryImpl.java     |  29 +++-
 .../broker/service/BrokerBkEnsemblesTests.java     | 150 ++++++++++++++++++++-
 .../metadata/bookkeeper/PulsarLedgerManager.java   |  65 +++++++--
 3 files changed, 231 insertions(+), 13 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
index d829efad87f..e437bdaa47c 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
@@ -883,7 +883,19 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
             DeleteLedgerCallback callback, Object ctx) {
         Futures.waitForAll(info.ledgers.stream()
                 .filter(li -> !li.isOffloaded)
-                .map(li -> bkc.newDeleteLedgerOp().withLedgerId(li.ledgerId).execute())
+                .map(li -> bkc.newDeleteLedgerOp().withLedgerId(li.ledgerId).execute()
+                        .handle((result, ex) -> {
+                            if (ex != null) {
+                                int rc = BKException.getExceptionCode(ex);
+                                if (rc == BKException.Code.NoSuchLedgerExistsOnMetadataServerException
+                                    || rc == BKException.Code.NoSuchLedgerExistsException) {
+                                    log.info("Ledger {} does not exist, ignoring", li.ledgerId);
+                                    return null;
+                                }
+                                throw new CompletionException(ex);
+                            }
+                            return result;
+                        }))
                 .collect(Collectors.toList()))
                 .thenRun(() -> {
                     // Delete the metadata
@@ -911,7 +923,20 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
 
         // Delete the cursor ledger if present
         if (cursor.cursorsLedgerId != -1) {
-            cursorLedgerDeleteFuture = bkc.newDeleteLedgerOp().withLedgerId(cursor.cursorsLedgerId).execute();
+            cursorLedgerDeleteFuture = bkc.newDeleteLedgerOp().withLedgerId(cursor.cursorsLedgerId)
+                    .execute()
+                    .handle((result, ex) -> {
+                        if (ex != null) {
+                            int rc = BKException.getExceptionCode(ex);
+                            if (rc == BKException.Code.NoSuchLedgerExistsOnMetadataServerException
+                                    || rc == BKException.Code.NoSuchLedgerExistsException) {
+                                log.info("Ledger {} does not exist, ignoring", cursor.cursorsLedgerId);
+                                return null;
+                            }
+                            throw new CompletionException(ex);
+                        }
+                        return result;
+                    });
         } else {
             cursorLedgerDeleteFuture = CompletableFuture.completedFuture(null);
         }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
index aa63b224a9d..d9ea5355dd3 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.broker.service;
 
 import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically;
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotEquals;
 import static org.testng.Assert.fail;
 
 import java.lang.reflect.Field;
@@ -31,9 +32,13 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
+import com.google.common.collect.Sets;
 import lombok.Cleanup;
 
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.api.LedgerMetadata;
 import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
 import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
@@ -47,11 +52,13 @@ import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.zookeeper.ZooKeeper;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
 @Test(groups = "broker")
+@Slf4j
 public class BrokerBkEnsemblesTests extends BkEnsemblesTestBase {
 
     public BrokerBkEnsemblesTests() {
@@ -235,7 +242,7 @@ public class BrokerBkEnsemblesTests extends BkEnsemblesTestBase {
                 try {
                     bookKeeper.deleteLedger(entry.getKey());
                 } catch (Exception e) {
-                    e.printStackTrace();
+                    log.warn("failed to delete ledger {}", entry.getKey(), e);
                 }
             }
         });
@@ -276,6 +283,147 @@ public class BrokerBkEnsemblesTests extends BkEnsemblesTestBase {
         consumer.close();
     }
 
+    @Test
+    public void testTruncateCorruptDataLedger() throws Exception {
+        // Ensure intended state for autoSkipNonRecoverableData
+        admin.brokers().updateDynamicConfiguration("autoSkipNonRecoverableData", "false");
+
+        @Cleanup
+        PulsarClient client = PulsarClient.builder()
+                .serviceUrl(pulsar.getWebServiceAddress())
+                .statsInterval(0, TimeUnit.SECONDS)
+                .build();
+
+        final int totalMessages = 100;
+        final int totalDataLedgers = 5;
+        final int entriesPerLedger = totalMessages / totalDataLedgers;
+
+        final String tenant = "prop";
+        try {
+            admin.tenants().createTenant(tenant, new TenantInfoImpl(Sets.newHashSet("role1", "role2"),
+                    Sets.newHashSet(config.getClusterName())));
+        } catch (Exception e) {
+
+        }
+        final String ns1 = tenant + "/crash-broker";
+        try {
+            admin.namespaces().createNamespace(ns1, Sets.newHashSet(config.getClusterName()));
+        } catch (Exception e) {
+
+        }
+
+        final String topic1 = "persistent://" + ns1 + "/my-topic-" + System.currentTimeMillis();
+
+        // Create subscription
+        Consumer<byte[]> consumer = client.newConsumer().topic(topic1).subscriptionName("my-subscriber-name")
+                .receiverQueueSize(5).subscribe();
+
+        PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topic1).get();
+        ManagedLedgerImpl ml = (ManagedLedgerImpl) topic.getManagedLedger();
+        ManagedCursorImpl cursor = (ManagedCursorImpl) ml.getCursors().iterator().next();
+        Field configField = ManagedCursorImpl.class.getDeclaredField("config");
+        configField.setAccessible(true);
+        // Create multiple data-ledger
+        ManagedLedgerConfig config = (ManagedLedgerConfig) configField.get(cursor);
+        config.setMaxEntriesPerLedger(entriesPerLedger);
+        config.setMinimumRolloverTime(1, TimeUnit.MILLISECONDS);
+        // bookkeeper client
+        Field bookKeeperField = ManagedLedgerImpl.class.getDeclaredField("bookKeeper");
+        bookKeeperField.setAccessible(true);
+        // Create multiple data-ledger
+        BookKeeper bookKeeper = (BookKeeper) bookKeeperField.get(ml);
+
+        // (1) publish messages in 10 data-ledgers each with 20 entries under managed-ledger
+        Producer<byte[]> producer = client.newProducer().topic(topic1).create();
+        for (int i = 0; i < totalMessages; i++) {
+            String message = "my-message-" + i;
+            producer.send(message.getBytes());
+        }
+
+        // validate: consumer is able to consume msg and close consumer after reading 1 entry
+        Assert.assertNotNull(consumer.receive(1, TimeUnit.SECONDS));
+        consumer.close();
+
+        NavigableMap<Long, LedgerInfo> ledgerInfo = ml.getLedgersInfo();
+        Assert.assertEquals(ledgerInfo.size(), totalDataLedgers);
+        Entry<Long, LedgerInfo> lastLedger = ledgerInfo.lastEntry();
+        long firstLedgerToDelete = lastLedger.getKey();
+
+        // (2) delete first 4 data-ledgers
+        ledgerInfo.entrySet().forEach(entry -> {
+            if (!entry.equals(lastLedger)) {
+                assertEquals(entry.getValue().getEntries(), entriesPerLedger);
+                try {
+                    bookKeeper.deleteLedger(entry.getKey());
+                } catch (Exception e) {
+                    log.warn("failed to delete ledger {}", entry.getKey(), e);
+                }
+            }
+        });
+
+        // create 5 more ledgers
+        for (int i = 0; i < totalMessages; i++) {
+            String message = "my-message2-" + i;
+            producer.send(message.getBytes());
+        }
+
+        ml.delete();
+
+        // Admin should be able to truncate the topic
+        admin.topics().truncate(topic1);
+
+        ledgerInfo.entrySet().forEach(entry -> {
+            log.warn("found ledger: {}", entry.getKey());
+            assertNotEquals(firstLedgerToDelete, entry.getKey());
+        });
+
+        // Currently, ledger deletion is async and failed deletion
+        // does not actually fail truncation but logs an exception
+        // and creates scheduled task to retry
+        Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> {
+            LedgerMetadata meta = bookKeeper
+                    .getLedgerMetadata(firstLedgerToDelete)
+                    .exceptionally(e -> null)
+                    .get();
+            assertEquals(null, meta, "ledger should be deleted " + firstLedgerToDelete);
+            });
+
+        // Should not throw, deleting absent ledger must be a noop
+        // unless PulsarManager returned a wrong error which
+        // got translated to BKUnexpectedConditionException
+        try {
+            bookKeeper.deleteLedger(firstLedgerToDelete);
+        } catch (BKException.BKNoSuchLedgerExistsOnMetadataServerException bke) {
+            // pass
+        }
+
+        producer.close();
+        consumer.close();
+    }
+
+    @Test
+    public void testDeleteLedgerFactoryCorruptLedger() throws Exception {
+        ManagedLedgerFactoryImpl factory = (ManagedLedgerFactoryImpl) pulsar.getManagedLedgerFactory();
+        ManagedLedgerImpl ml = (ManagedLedgerImpl) factory.open("test");
+
+        // bookkeeper client
+        Field bookKeeperField = ManagedLedgerImpl.class.getDeclaredField("bookKeeper");
+        bookKeeperField.setAccessible(true);
+        // Create multiple data-ledger
+        BookKeeper bookKeeper = (BookKeeper) bookKeeperField.get(ml);
+
+        ml.addEntry("dummy-entry-1".getBytes());
+
+        NavigableMap<Long, LedgerInfo> ledgerInfo = ml.getLedgersInfo();
+        long lastLedger = ledgerInfo.lastEntry().getKey();
+
+        ml.close();
+        bookKeeper.deleteLedger(lastLedger);
+
+        // BK ledger is deleted, factory should not throw on delete
+        factory.delete("test");
+    }
+
     @Test(timeOut = 20000)
     public void testTopicWithWildCardChar() throws Exception {
         @Cleanup
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerManager.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerManager.java
index 5d5854743b2..1d05f5726a5 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerManager.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerManager.java
@@ -24,8 +24,10 @@ import java.util.HashSet;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -89,6 +91,32 @@ public class PulsarLedgerManager implements LedgerManager {
         store.registerListener(this::handleDataNotification);
     }
 
+    private static Throwable mapToBkException(Throwable ex) {
+        if (ex instanceof CompletionException || ex instanceof ExecutionException) {
+            return mapToBkException(ex.getCause());
+        }
+
+        if (ex instanceof MetadataStoreException.NotFoundException) {
+            BKException bke = BKException.create(BKException.Code.NoSuchLedgerExistsOnMetadataServerException);
+            bke.initCause(ex);
+            return bke;
+        } else if (ex instanceof MetadataStoreException.AlreadyExistsException) {
+            BKException bke = BKException.create(BKException.Code.LedgerExistException);
+            bke.initCause(ex);
+            return bke;
+        } else if (ex instanceof MetadataStoreException.BadVersionException) {
+            BKException bke = BKException.create(BKException.Code.MetadataVersionException);
+            bke.initCause(ex);
+            return bke;
+        } else if (ex instanceof MetadataStoreException.AlreadyClosedException) {
+            BKException bke = BKException.create(BKException.Code.LedgerClosedException);
+            bke.initCause(ex);
+            return bke;
+        }
+
+        return ex;
+    }
+
     @Override
     public CompletableFuture<Versioned<LedgerMetadata>> createLedgerMetadata(long ledgerId,
                                                                              LedgerMetadata inputMetadata) {
@@ -106,14 +134,21 @@ public class PulsarLedgerManager implements LedgerManager {
             return FutureUtil.failedFuture(new BKException.BKMetadataSerializationException(ioe));
         }
 
-        CompletableFuture<Versioned<LedgerMetadata>> future = store.put(getLedgerPath(ledgerId), data, Optional.of(-1L))
-                .thenApply(stat -> new Versioned(metadata, new LongVersion(stat.getVersion())));
-        future.exceptionally(ex -> {
-            log.error("Failed to create ledger {}: {}", ledgerId, ex.getMessage());
-            return null;
-        });
+        CompletableFuture<Versioned<LedgerMetadata>> promise = new CompletableFuture<>();
 
-        return future;
+        store.put(getLedgerPath(ledgerId), data, Optional.of(-1L))
+                .whenComplete((stat, ex) -> {
+                    if (ex != null) {
+                        log.error("Failed to create ledger {}: {}", ledgerId, ex.getMessage());
+                        promise.completeExceptionally(mapToBkException(ex));
+                        return;
+                    }
+
+                    Versioned<LedgerMetadata> result = new Versioned(metadata, new LongVersion(stat.getVersion()));
+                    promise.complete(result);
+                });
+
+        return promise;
     }
 
     @Override
@@ -131,9 +166,17 @@ public class PulsarLedgerManager implements LedgerManager {
             }
         }
 
-        return store.delete(getLedgerPath(ledgerId), existingVersion)
-                .thenRun(() -> {
-                    // removed listener on ledgerId
+        CompletableFuture<Void> promise = new CompletableFuture<>();
+        store.delete(getLedgerPath(ledgerId), existingVersion)
+                .whenComplete((result, ex) -> {
+                    if (ex != null) {
+                        log.error("Failed to remove ledger metadata {}: {}", ledgerId, ex.getMessage());
+                        promise.completeExceptionally(mapToBkException(ex));
+                        return;
+                    }
+
+                    promise.complete(result);
+                    // remove listener on ledgerId
                     Set<BookkeeperInternalCallbacks.LedgerMetadataListener> listenerSet = listeners.remove(ledgerId);
                     if (null != listenerSet) {
                         if (log.isDebugEnabled()) {
@@ -148,6 +191,8 @@ public class PulsarLedgerManager implements LedgerManager {
                         }
                     }
                 });
+
+        return promise;
     }
 
     @Override