You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/03/18 07:59:38 UTC
[pulsar] 16/16: [Transaction] Fix transaction buffer recover BrokerMetadataException close topic (#14709)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 1fae42d723bcb0515c593ba1cedb0cc9d67578d3
Author: congbo <39...@users.noreply.github.com>
AuthorDate: Fri Mar 18 11:06:47 2022 +0800
[Transaction] Fix transaction buffer recover BrokerMetadataException close topic (#14709)
### Motivation
When TopicTransactionBuffer recover fail throw BrokerMetadataException, we should close this topic, if we don't close the topic, we can't send message because TopicTransactionBuffer recover fail
![image](https://user-images.githubusercontent.com/39078850/158532983-993c0303-4051-4e56-90e1-c6ce89fa3775.png)
### Modifications
When recover fail by BrokerMetadataException, close topic
### Verifying this change
add test for it
(cherry picked from commit c4e4ddd1dae2249938c8ce15e5282301b167cd5e)
---
.../buffer/impl/TopicTransactionBuffer.java | 5 ++
.../TopicTransactionBufferRecoverTest.java | 64 +++++++++++++++++++++-
.../pulsar/broker/transaction/TransactionTest.java | 3 +-
3 files changed, 70 insertions(+), 2 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
index e0a6695..3b28966d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
@@ -179,6 +179,11 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen
@Override
public void recoverExceptionally(Exception e) {
+ if (e instanceof PulsarClientException.BrokerMetadataException) {
+ log.warn("Closing topic {} due to read transaction buffer snapshot while recovering the "
+ + "transaction buffer throw exception", topic.getName(), e);
+ topic.close();
+ }
transactionBufferFuture.completeExceptionally(e);
}
}, this.topic, this));
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
index 335cecc..5701d22 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
@@ -33,10 +33,14 @@ import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.commons.collections4.map.LinkedMap;
import org.apache.commons.lang3.RandomUtils;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.service.AbstractTopic;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.broker.service.TransactionBufferSnapshotService;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory;
+import org.apache.pulsar.broker.systopic.SystemTopicClient;
import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer;
import org.apache.pulsar.broker.transaction.buffer.matadata.TransactionBufferSnapshot;
import org.apache.pulsar.client.api.Consumer;
@@ -44,6 +48,7 @@ import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.ReaderBuilder;
import org.apache.pulsar.client.api.Schema;
@@ -60,7 +65,10 @@ import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
-
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
@@ -442,4 +450,58 @@ public class TopicTransactionBufferRecoverTest extends TransactionTestBase {
reader.close();
}
+
+ @Test(timeOut=30000)
+ public void testTransactionBufferRecoverThrowBrokerMetadataException() throws Exception {
+ String topic = NAMESPACE1 + "/testTransactionBufferRecoverThrowBrokerMetadataException";
+ @Cleanup
+ Producer<byte[]> producer = pulsarClient
+ .newProducer()
+ .topic(topic)
+ .sendTimeout(0, TimeUnit.SECONDS)
+ .create();
+
+ Transaction txn = pulsarClient.newTransaction()
+ .withTransactionTimeout(5, TimeUnit.SECONDS)
+ .build().get();
+
+ producer.newMessage(txn).value("test".getBytes()).sendAsync();
+ producer.newMessage(txn).value("test".getBytes()).sendAsync();
+ txn.commit().get();
+
+ // take snapshot
+ PersistentTopic originalTopic = (PersistentTopic) getPulsarServiceList().get(0)
+ .getBrokerService().getTopic(TopicName.get(topic).toString(), false).get().get();
+ TransactionBufferSnapshotService transactionBufferSnapshotService =
+ mock(TransactionBufferSnapshotService.class);
+ SystemTopicClient.Reader<TransactionBufferSnapshot> reader = mock(SystemTopicClient.Reader.class);
+ // mock reader can't read snapshot fail
+ doThrow(new PulsarClientException.BrokerMetadataException("")).when(reader).hasMoreEvents();
+ doReturn(CompletableFuture.completedFuture(reader)).when(transactionBufferSnapshotService).createReader(any());
+
+ Field field = PulsarService.class.getDeclaredField("transactionBufferSnapshotService");
+ field.setAccessible(true);
+ TransactionBufferSnapshotService transactionBufferSnapshotServiceOriginal =
+ (TransactionBufferSnapshotService) field.get(getPulsarServiceList().get(0));
+ field.set(getPulsarServiceList().get(0), transactionBufferSnapshotService);
+
+ // recover again will throw BrokerMetadataException then close topic
+ new TopicTransactionBuffer(originalTopic);
+ Awaitility.await().untilAsserted(() -> {
+ // isFenced means closed
+ Field close = AbstractTopic.class.getDeclaredField("isFenced");
+ close.setAccessible(true);
+ assertTrue((boolean) close.get(originalTopic));
+ });
+ field.set(getPulsarServiceList().get(0), transactionBufferSnapshotServiceOriginal);
+
+ // topic recover success
+ txn = pulsarClient.newTransaction()
+ .withTransactionTimeout(5, TimeUnit.SECONDS)
+ .build().get();
+
+ producer.newMessage(txn).value("test".getBytes()).sendAsync();
+ txn.commit().get();
+ }
+
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index 28d4fcf..2b1a2f4 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -41,6 +41,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.List;
import java.util.Optional;
+import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
@@ -806,7 +807,7 @@ public class TransactionTest extends TransactionTestBase {
public void testNotChangeMaxReadPositionAndAddAbortTimesWhenCheckIfNoSnapshot() throws Exception {
PersistentTopic persistentTopic = (PersistentTopic) getPulsarServiceList().get(0)
.getBrokerService()
- .getTopic(NAMESPACE1 + "/test", true)
+ .getTopic(NAMESPACE1 + "/changeMaxReadPositionAndAddAbortTimes" + UUID.randomUUID(), true)
.get().get();
TransactionBuffer buffer = persistentTopic.getTransactionBuffer();
Field field = TopicTransactionBuffer.class.getDeclaredField("changeMaxReadPositionAndAddAbortTimes");