You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2023/02/03 01:48:40 UTC

[pulsar] branch branch-2.9 updated: [branch-2.9][cherry-pick] Close TransactionBuffer when MessageDeduplication#checkStatus failed (#19287)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
     new d4f8c40ddde [branch-2.9][cherry-pick] Close TransactionBuffer when MessageDeduplication#checkStatus failed (#19287)
d4f8c40ddde is described below

commit d4f8c40ddde009fdacd3b484ce3055d609f16bc9
Author: Tao Jiuming <95...@users.noreply.github.com>
AuthorDate: Fri Feb 3 09:48:31 2023 +0800

    [branch-2.9][cherry-pick] Close TransactionBuffer when MessageDeduplication#checkStatus failed (#19287)
---
 .../pulsar/broker/service/BrokerService.java       | 19 +++++++-
 .../buffer/TopicTransactionBufferTest.java         | 52 ++++++++++++++++++++++
 2 files changed, 70 insertions(+), 1 deletion(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 10ae4fa118f..58c5987dfa0 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1325,6 +1325,12 @@ public class BrokerService implements Closeable {
                 });
     }
 
+    @VisibleForTesting
+    public void createPersistentTopic0(final String topic, boolean createIfMissing,
+                                       CompletableFuture<Optional<Topic>> topicFuture) {
+        createPersistentTopic(topic, createIfMissing, topicFuture);
+    }
+
     private void createPersistentTopic(final String topic, boolean createIfMissing,
                                        CompletableFuture<Optional<Topic>> topicFuture) {
         final long topicCreateTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
@@ -1367,7 +1373,7 @@ public class BrokerService implements Closeable {
                             try {
                                 PersistentTopic persistentTopic = isSystemTopic(topic)
                                         ? new SystemTopic(topic, ledger, BrokerService.this)
-                                        : new PersistentTopic(topic, ledger, BrokerService.this);
+                                        : newPersistentTopic(topic, ledger, BrokerService.this);
                                 CompletableFuture<Void> preCreateSubForCompaction =
                                         persistentTopic.preCreateSubscriptionForCompactionIfNeeded();
                                 CompletableFuture<Void> replicationFuture = persistentTopic
@@ -1400,6 +1406,12 @@ public class BrokerService implements Closeable {
                                             "Replication or dedup check failed."
                                                     + " Removing topic from topics list {}, {}",
                                             topic, ex);
+                                    persistentTopic.getTransactionBuffer()
+                                            .closeAsync()
+                                            .exceptionally(t -> {
+                                                log.error("[{}] Close transactionBuffer failed", topic, t);
+                                                return null;
+                                            });
                                     persistentTopic.stopReplProducers().whenCompleteAsync((v, exception) -> {
                                         topics.remove(topic, topicFuture);
                                         topicFuture.completeExceptionally(ex);
@@ -2947,6 +2959,11 @@ public class BrokerService implements Closeable {
         return pausedConnections.longValue();
     }
 
+    @VisibleForTesting
+    public PersistentTopic newPersistentTopic(String topic, ManagedLedger ledger, BrokerService brokerService){
+        return new PersistentTopic(topic, ledger, brokerService);
+    }
+
     @VisibleForTesting
     public void setPulsarChannelInitializerFactory(PulsarChannelInitializer.Factory factory) {
         this.pulsarChannelInitFactory = factory;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java
index 576ef647248..7f1a5a8042a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java
@@ -18,9 +18,16 @@
  */
 package org.apache.pulsar.broker.transaction.buffer;
 
+import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.transaction.TransactionTestBase;
+import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer;
+import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBufferState;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.transaction.Transaction;
 import org.apache.pulsar.common.naming.TopicName;
@@ -30,11 +37,17 @@ import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState;
 import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore;
 import org.awaitility.Awaitility;
 import org.powermock.reflect.Whitebox;
+import org.mockito.Mockito;
+import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
+import java.util.Collections;
 import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 
 public class TopicTransactionBufferTest extends TransactionTestBase {
 
@@ -86,4 +99,43 @@ public class TopicTransactionBufferTest extends TransactionTestBase {
         Whitebox.setInternalState(persistentTopic.getManagedLedger(), "state", ManagedLedgerImpl.State.WriteFailed);
         txn.commit().get();
     }
+
+    @Test
+    public void testCheckDeduplicationFailedWhenCreatePersistentTopic() throws Exception {
+        String topic = "persistent://" + NAMESPACE1 + "/test_" + UUID.randomUUID();
+        PulsarService pulsar = pulsarServiceList.get(0);
+        BrokerService brokerService0 = pulsar.getBrokerService();
+        BrokerService brokerService = Mockito.spy(brokerService0);
+        AtomicReference<PersistentTopic> reference = new AtomicReference<>();
+
+        Mockito
+                .doAnswer(inv -> {
+                    String topic1 = inv.getArgument(0);
+                    ManagedLedger ledger = inv.getArgument(1);
+                    BrokerService service = inv.getArgument(2);
+                    if (TopicName.get(topic1).isPersistent()) {
+                        PersistentTopic pt = Mockito.spy(new PersistentTopic(topic1, ledger, service));
+                        CompletableFuture<Void> f = new CompletableFuture<>();
+                        f.completeExceptionally(new ManagedLedgerException("This is an exception"));
+                        Mockito.doReturn(f).when(pt).checkDeduplicationStatus();
+                        reference.set(pt);
+                        return pt;
+                    } else {
+                        return new NonPersistentTopic(topic1, service);
+                    }
+                })
+                .when(brokerService)
+                .newPersistentTopic(Mockito.eq(topic), Mockito.any(), Mockito.eq(brokerService));
+
+        brokerService.createPersistentTopic0(topic, true, new CompletableFuture<>());
+
+        Awaitility.waitAtMost(1, TimeUnit.MINUTES).until(() -> reference.get() != null);
+        PersistentTopic persistentTopic = reference.get();
+        TransactionBuffer buffer = persistentTopic.getTransactionBuffer();
+        Assert.assertTrue(buffer instanceof TopicTransactionBuffer);
+        TopicTransactionBuffer ttb = (TopicTransactionBuffer) buffer;
+        TopicTransactionBufferState.State expectState = TopicTransactionBufferState.State.Close;
+        Assert.assertEquals(ttb.getState(), expectState);
+    }
+
 }