You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2023/02/03 01:48:40 UTC
[pulsar] branch branch-2.9 updated: [branch-2.9][cherry-pick] Close TransactionBuffer when MessageDeduplication#checkStatus failed (#19287)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new d4f8c40ddde [branch-2.9][cherry-pick] Close TransactionBuffer when MessageDeduplication#checkStatus failed (#19287)
d4f8c40ddde is described below
commit d4f8c40ddde009fdacd3b484ce3055d609f16bc9
Author: Tao Jiuming <95...@users.noreply.github.com>
AuthorDate: Fri Feb 3 09:48:31 2023 +0800
[branch-2.9][cherry-pick] Close TransactionBuffer when MessageDeduplication#checkStatus failed (#19287)
---
.../pulsar/broker/service/BrokerService.java | 19 +++++++-
.../buffer/TopicTransactionBufferTest.java | 52 ++++++++++++++++++++++
2 files changed, 70 insertions(+), 1 deletion(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 10ae4fa118f..58c5987dfa0 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1325,6 +1325,12 @@ public class BrokerService implements Closeable {
});
}
+ @VisibleForTesting
+ public void createPersistentTopic0(final String topic, boolean createIfMissing,
+ CompletableFuture<Optional<Topic>> topicFuture) {
+ createPersistentTopic(topic, createIfMissing, topicFuture);
+ }
+
private void createPersistentTopic(final String topic, boolean createIfMissing,
CompletableFuture<Optional<Topic>> topicFuture) {
final long topicCreateTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
@@ -1367,7 +1373,7 @@ public class BrokerService implements Closeable {
try {
PersistentTopic persistentTopic = isSystemTopic(topic)
? new SystemTopic(topic, ledger, BrokerService.this)
- : new PersistentTopic(topic, ledger, BrokerService.this);
+ : newPersistentTopic(topic, ledger, BrokerService.this);
CompletableFuture<Void> preCreateSubForCompaction =
persistentTopic.preCreateSubscriptionForCompactionIfNeeded();
CompletableFuture<Void> replicationFuture = persistentTopic
@@ -1400,6 +1406,12 @@ public class BrokerService implements Closeable {
"Replication or dedup check failed."
+ " Removing topic from topics list {}, {}",
topic, ex);
+ persistentTopic.getTransactionBuffer()
+ .closeAsync()
+ .exceptionally(t -> {
+ log.error("[{}] Close transactionBuffer failed", topic, t);
+ return null;
+ });
persistentTopic.stopReplProducers().whenCompleteAsync((v, exception) -> {
topics.remove(topic, topicFuture);
topicFuture.completeExceptionally(ex);
@@ -2947,6 +2959,11 @@ public class BrokerService implements Closeable {
return pausedConnections.longValue();
}
+ @VisibleForTesting
+ public PersistentTopic newPersistentTopic(String topic, ManagedLedger ledger, BrokerService brokerService){
+ return new PersistentTopic(topic, ledger, brokerService);
+ }
+
@VisibleForTesting
public void setPulsarChannelInitializerFactory(PulsarChannelInitializer.Factory factory) {
this.pulsarChannelInitFactory = factory;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java
index 576ef647248..7f1a5a8042a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java
@@ -18,9 +18,16 @@
*/
package org.apache.pulsar.broker.transaction.buffer;
+import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.transaction.TransactionTestBase;
+import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer;
+import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBufferState;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.common.naming.TopicName;
@@ -30,11 +37,17 @@ import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState;
import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore;
import org.awaitility.Awaitility;
import org.powermock.reflect.Whitebox;
+import org.mockito.Mockito;
+import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
+import java.util.Collections;
import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
public class TopicTransactionBufferTest extends TransactionTestBase {
@@ -86,4 +99,43 @@ public class TopicTransactionBufferTest extends TransactionTestBase {
Whitebox.setInternalState(persistentTopic.getManagedLedger(), "state", ManagedLedgerImpl.State.WriteFailed);
txn.commit().get();
}
+
+ @Test
+ public void testCheckDeduplicationFailedWhenCreatePersistentTopic() throws Exception {
+ String topic = "persistent://" + NAMESPACE1 + "/test_" + UUID.randomUUID();
+ PulsarService pulsar = pulsarServiceList.get(0);
+ BrokerService brokerService0 = pulsar.getBrokerService();
+ BrokerService brokerService = Mockito.spy(brokerService0);
+ AtomicReference<PersistentTopic> reference = new AtomicReference<>();
+
+ Mockito
+ .doAnswer(inv -> {
+ String topic1 = inv.getArgument(0);
+ ManagedLedger ledger = inv.getArgument(1);
+ BrokerService service = inv.getArgument(2);
+ if (TopicName.get(topic1).isPersistent()) {
+ PersistentTopic pt = Mockito.spy(new PersistentTopic(topic1, ledger, service));
+ CompletableFuture<Void> f = new CompletableFuture<>();
+ f.completeExceptionally(new ManagedLedgerException("This is an exception"));
+ Mockito.doReturn(f).when(pt).checkDeduplicationStatus();
+ reference.set(pt);
+ return pt;
+ } else {
+ return new NonPersistentTopic(topic1, service);
+ }
+ })
+ .when(brokerService)
+ .newPersistentTopic(Mockito.eq(topic), Mockito.any(), Mockito.eq(brokerService));
+
+ brokerService.createPersistentTopic0(topic, true, new CompletableFuture<>());
+
+ Awaitility.waitAtMost(1, TimeUnit.MINUTES).until(() -> reference.get() != null);
+ PersistentTopic persistentTopic = reference.get();
+ TransactionBuffer buffer = persistentTopic.getTransactionBuffer();
+ Assert.assertTrue(buffer instanceof TopicTransactionBuffer);
+ TopicTransactionBuffer ttb = (TopicTransactionBuffer) buffer;
+ TopicTransactionBufferState.State expectState = TopicTransactionBufferState.State.Close;
+ Assert.assertEquals(ttb.getState(), expectState);
+ }
+
}