You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2023/02/08 09:34:10 UTC

[pulsar] branch branch-2.11 updated: [fix] Close TransactionBuffer when create persistent topic timeout (#19384)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.11 by this push:
     new 8053a46cd91 [fix] Close TransactionBuffer when create persistent topic timeout (#19384)
8053a46cd91 is described below

commit 8053a46cd91eaf3d4fcbdff16b9cfe39cbb0d146
Author: Tao Jiuming <95...@users.noreply.github.com>
AuthorDate: Mon Feb 6 15:32:59 2023 +0800

    [fix] Close TransactionBuffer when create persistent topic timeout (#19384)
---
 .../pulsar/broker/service/BrokerService.java       |  7 ++++
 .../buffer/TopicTransactionBufferTest.java         | 41 +++++++++++++++++++++-
 2 files changed, 47 insertions(+), 1 deletion(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 6dc78736c9e..2262f3381e5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1456,8 +1456,15 @@ public class BrokerService implements Closeable {
                                                                         - topicCreateTimeMs;
                                             pulsarStats.recordTopicLoadTimeValue(topic, topicLoadLatencyMs);
                                             if (topicFuture.isCompletedExceptionally()) {
+                                                // Check create persistent topic timeout.
                                                 log.warn("{} future is already completed with failure {}, closing the"
                                                         + " topic", topic, FutureUtil.getException(topicFuture));
+                                                persistentTopic.getTransactionBuffer()
+                                                        .closeAsync()
+                                                        .exceptionally(t -> {
+                                                            log.error("[{}] Close transactionBuffer failed", topic, t);
+                                                            return null;
+                                                        });
                                                 persistentTopic.stopReplProducers()
                                                         .whenCompleteAsync((v, exception) -> {
                                                             topics.remove(topic, topicFuture);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java
index 04fc665b19d..86967464aa6 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java
@@ -24,6 +24,7 @@ import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.transaction.TransactionTestBase;
@@ -43,8 +44,11 @@ import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
+
+import java.time.Duration;
 import java.util.Collections;
 import java.util.Map;
+import java.util.Optional;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
@@ -117,7 +121,7 @@ public class TopicTransactionBufferTest extends TransactionTestBase {
                     Class<?> topicKlass = inv.getArgument(3);
                     if (topicKlass.equals(PersistentTopic.class)) {
                         PersistentTopic pt = Mockito.spy(new PersistentTopic(topic1, ledger, service));
-                        CompletableFuture<Void> f =CompletableFuture
+                        CompletableFuture<Void> f = CompletableFuture
                                 .failedFuture(new ManagedLedgerException("This is an exception"));
                         Mockito.doReturn(f).when(pt).checkDeduplicationStatus();
                         reference.set(pt);
@@ -141,4 +145,39 @@ public class TopicTransactionBufferTest extends TransactionTestBase {
         Assert.assertEquals(ttb.getState(), expectState);
     }
 
+
+    @Test
+    public void testCloseTransactionBufferWhenTimeout() throws Exception {
+        String topic = "persistent://" + NAMESPACE1 + "/test_" + UUID.randomUUID();
+        PulsarService pulsar = pulsarServiceList.get(0);
+        BrokerService brokerService0 = pulsar.getBrokerService();
+        BrokerService brokerService = Mockito.spy(brokerService0);
+        AtomicReference<PersistentTopic> reference = new AtomicReference<>();
+        pulsar.getConfiguration().setTopicLoadTimeoutSeconds(10);
+        long topicLoadTimeout = TimeUnit.SECONDS.toMillis(pulsar.getConfiguration().getTopicLoadTimeoutSeconds() + 1);
+
+        Mockito
+                .doAnswer(inv -> {
+                    Thread.sleep(topicLoadTimeout);
+                    PersistentTopic persistentTopic = (PersistentTopic) inv.callRealMethod();
+                    reference.set(persistentTopic);
+                    return persistentTopic;
+                })
+                .when(brokerService)
+                .newTopic(Mockito.eq(topic), Mockito.any(), Mockito.eq(brokerService),
+                        Mockito.eq(PersistentTopic.class));
+
+        CompletableFuture<Optional<Topic>> f = brokerService.getTopic(topic, true);
+
+        Awaitility.waitAtMost(20, TimeUnit.SECONDS)
+                .pollInterval(Duration.ofSeconds(2)).until(() -> reference.get() != null);
+        PersistentTopic persistentTopic = reference.get();
+        TransactionBuffer buffer = persistentTopic.getTransactionBuffer();
+        Assert.assertTrue(buffer instanceof TopicTransactionBuffer);
+        TopicTransactionBuffer ttb = (TopicTransactionBuffer) buffer;
+        TopicTransactionBufferState.State expectState = TopicTransactionBufferState.State.Close;
+        Assert.assertEquals(ttb.getState(), expectState);
+        Assert.assertTrue(f.isCompletedExceptionally());
+    }
+
 }