You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2023/02/08 09:34:10 UTC
[pulsar] branch branch-2.11 updated: [fix] Close TransactionBuffer when create persistent topic timeout (#19384)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.11 by this push:
new 8053a46cd91 [fix] Close TransactionBuffer when create persistent topic timeout (#19384)
8053a46cd91 is described below
commit 8053a46cd91eaf3d4fcbdff16b9cfe39cbb0d146
Author: Tao Jiuming <95...@users.noreply.github.com>
AuthorDate: Mon Feb 6 15:32:59 2023 +0800
[fix] Close TransactionBuffer when create persistent topic timeout (#19384)
---
.../pulsar/broker/service/BrokerService.java | 7 ++++
.../buffer/TopicTransactionBufferTest.java | 41 +++++++++++++++++++++-
2 files changed, 47 insertions(+), 1 deletion(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 6dc78736c9e..2262f3381e5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1456,8 +1456,15 @@ public class BrokerService implements Closeable {
- topicCreateTimeMs;
pulsarStats.recordTopicLoadTimeValue(topic, topicLoadLatencyMs);
if (topicFuture.isCompletedExceptionally()) {
+ // Check create persistent topic timeout.
log.warn("{} future is already completed with failure {}, closing the"
+ " topic", topic, FutureUtil.getException(topicFuture));
+ persistentTopic.getTransactionBuffer()
+ .closeAsync()
+ .exceptionally(t -> {
+ log.error("[{}] Close transactionBuffer failed", topic, t);
+ return null;
+ });
persistentTopic.stopReplProducers()
.whenCompleteAsync((v, exception) -> {
topics.remove(topic, topicFuture);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java
index 04fc665b19d..86967464aa6 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java
@@ -24,6 +24,7 @@ import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.transaction.TransactionTestBase;
@@ -43,8 +44,11 @@ import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
+
+import java.time.Duration;
import java.util.Collections;
import java.util.Map;
+import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
@@ -117,7 +121,7 @@ public class TopicTransactionBufferTest extends TransactionTestBase {
Class<?> topicKlass = inv.getArgument(3);
if (topicKlass.equals(PersistentTopic.class)) {
PersistentTopic pt = Mockito.spy(new PersistentTopic(topic1, ledger, service));
- CompletableFuture<Void> f =CompletableFuture
+ CompletableFuture<Void> f = CompletableFuture
.failedFuture(new ManagedLedgerException("This is an exception"));
Mockito.doReturn(f).when(pt).checkDeduplicationStatus();
reference.set(pt);
@@ -141,4 +145,39 @@ public class TopicTransactionBufferTest extends TransactionTestBase {
Assert.assertEquals(ttb.getState(), expectState);
}
+
+ @Test
+ public void testCloseTransactionBufferWhenTimeout() throws Exception {
+ String topic = "persistent://" + NAMESPACE1 + "/test_" + UUID.randomUUID();
+ PulsarService pulsar = pulsarServiceList.get(0);
+ BrokerService brokerService0 = pulsar.getBrokerService();
+ BrokerService brokerService = Mockito.spy(brokerService0);
+ AtomicReference<PersistentTopic> reference = new AtomicReference<>();
+ pulsar.getConfiguration().setTopicLoadTimeoutSeconds(10);
+ long topicLoadTimeout = TimeUnit.SECONDS.toMillis(pulsar.getConfiguration().getTopicLoadTimeoutSeconds() + 1);
+
+ Mockito
+ .doAnswer(inv -> {
+ Thread.sleep(topicLoadTimeout);
+ PersistentTopic persistentTopic = (PersistentTopic) inv.callRealMethod();
+ reference.set(persistentTopic);
+ return persistentTopic;
+ })
+ .when(brokerService)
+ .newTopic(Mockito.eq(topic), Mockito.any(), Mockito.eq(brokerService),
+ Mockito.eq(PersistentTopic.class));
+
+ CompletableFuture<Optional<Topic>> f = brokerService.getTopic(topic, true);
+
+ Awaitility.waitAtMost(20, TimeUnit.SECONDS)
+ .pollInterval(Duration.ofSeconds(2)).until(() -> reference.get() != null);
+ PersistentTopic persistentTopic = reference.get();
+ TransactionBuffer buffer = persistentTopic.getTransactionBuffer();
+ Assert.assertTrue(buffer instanceof TopicTransactionBuffer);
+ TopicTransactionBuffer ttb = (TopicTransactionBuffer) buffer;
+ TopicTransactionBufferState.State expectState = TopicTransactionBufferState.State.Close;
+ Assert.assertEquals(ttb.getState(), expectState);
+ Assert.assertTrue(f.isCompletedExceptionally());
+ }
+
}