You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/03/18 07:59:38 UTC

[pulsar] 16/16: [Transaction] Fix transaction buffer recover BrokerMetadataException close topic (#14709)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 1fae42d723bcb0515c593ba1cedb0cc9d67578d3
Author: congbo <39...@users.noreply.github.com>
AuthorDate: Fri Mar 18 11:06:47 2022 +0800

    [Transaction] Fix transaction buffer recover BrokerMetadataException close topic (#14709)
    
    ### Motivation
    When TopicTransactionBuffer recover fail throw BrokerMetadataException, we should close this topic, if we don't close the topic, we can't send message because TopicTransactionBuffer recover fail
    
    ![image](https://user-images.githubusercontent.com/39078850/158532983-993c0303-4051-4e56-90e1-c6ce89fa3775.png)
    
    ### Modifications
    When recover fail by BrokerMetadataException, close topic
    ### Verifying this change
    add test for it
    
    (cherry picked from commit c4e4ddd1dae2249938c8ce15e5282301b167cd5e)
---
 .../buffer/impl/TopicTransactionBuffer.java        |  5 ++
 .../TopicTransactionBufferRecoverTest.java         | 64 +++++++++++++++++++++-
 .../pulsar/broker/transaction/TransactionTest.java |  3 +-
 3 files changed, 70 insertions(+), 2 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
index e0a6695..3b28966d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
@@ -179,6 +179,11 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen
 
                     @Override
                     public void recoverExceptionally(Exception e) {
+                        if (e instanceof PulsarClientException.BrokerMetadataException) {
+                            log.warn("Closing topic {} due to read transaction buffer snapshot while recovering the "
+                                    + "transaction buffer throw exception", topic.getName(), e);
+                            topic.close();
+                        }
                         transactionBufferFuture.completeExceptionally(e);
                     }
                 }, this.topic, this));
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
index 335cecc..5701d22 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
@@ -33,10 +33,14 @@ import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats;
 import org.apache.commons.collections4.map.LinkedMap;
 import org.apache.commons.lang3.RandomUtils;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.service.AbstractTopic;
 import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.broker.service.TransactionBufferSnapshotService;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory;
+import org.apache.pulsar.broker.systopic.SystemTopicClient;
 import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer;
 import org.apache.pulsar.broker.transaction.buffer.matadata.TransactionBufferSnapshot;
 import org.apache.pulsar.client.api.Consumer;
@@ -44,6 +48,7 @@ import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.client.api.ReaderBuilder;
 import org.apache.pulsar.client.api.Schema;
@@ -60,7 +65,10 @@ import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
-
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotNull;
@@ -442,4 +450,58 @@ public class TopicTransactionBufferRecoverTest extends TransactionTestBase {
         reader.close();
     }
 
+
+    @Test(timeOut=30000)
+    public void testTransactionBufferRecoverThrowBrokerMetadataException() throws Exception {
+        String topic = NAMESPACE1 + "/testTransactionBufferRecoverThrowBrokerMetadataException";
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient
+                .newProducer()
+                .topic(topic)
+                .sendTimeout(0, TimeUnit.SECONDS)
+                .create();
+
+        Transaction txn = pulsarClient.newTransaction()
+                .withTransactionTimeout(5, TimeUnit.SECONDS)
+                .build().get();
+
+        producer.newMessage(txn).value("test".getBytes()).sendAsync();
+        producer.newMessage(txn).value("test".getBytes()).sendAsync();
+        txn.commit().get();
+
+        // take snapshot
+        PersistentTopic originalTopic = (PersistentTopic) getPulsarServiceList().get(0)
+                .getBrokerService().getTopic(TopicName.get(topic).toString(), false).get().get();
+        TransactionBufferSnapshotService transactionBufferSnapshotService =
+                mock(TransactionBufferSnapshotService.class);
+        SystemTopicClient.Reader<TransactionBufferSnapshot> reader = mock(SystemTopicClient.Reader.class);
+        // mock reader can't read snapshot fail
+        doThrow(new PulsarClientException.BrokerMetadataException("")).when(reader).hasMoreEvents();
+        doReturn(CompletableFuture.completedFuture(reader)).when(transactionBufferSnapshotService).createReader(any());
+
+        Field field = PulsarService.class.getDeclaredField("transactionBufferSnapshotService");
+        field.setAccessible(true);
+        TransactionBufferSnapshotService transactionBufferSnapshotServiceOriginal =
+                (TransactionBufferSnapshotService) field.get(getPulsarServiceList().get(0));
+        field.set(getPulsarServiceList().get(0), transactionBufferSnapshotService);
+
+        // recover again will throw BrokerMetadataException then close topic
+        new TopicTransactionBuffer(originalTopic);
+        Awaitility.await().untilAsserted(() -> {
+            // isFenced means closed
+            Field close = AbstractTopic.class.getDeclaredField("isFenced");
+            close.setAccessible(true);
+            assertTrue((boolean) close.get(originalTopic));
+        });
+        field.set(getPulsarServiceList().get(0), transactionBufferSnapshotServiceOriginal);
+
+        // topic recover success
+        txn = pulsarClient.newTransaction()
+                .withTransactionTimeout(5, TimeUnit.SECONDS)
+                .build().get();
+
+        producer.newMessage(txn).value("test".getBytes()).sendAsync();
+        txn.commit().get();
+    }
+
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index 28d4fcf..2b1a2f4 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -41,6 +41,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.List;
 import java.util.Optional;
+import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
@@ -806,7 +807,7 @@ public class TransactionTest extends TransactionTestBase {
     public void testNotChangeMaxReadPositionAndAddAbortTimesWhenCheckIfNoSnapshot() throws Exception {
         PersistentTopic persistentTopic = (PersistentTopic) getPulsarServiceList().get(0)
                 .getBrokerService()
-                .getTopic(NAMESPACE1 + "/test", true)
+                .getTopic(NAMESPACE1 + "/changeMaxReadPositionAndAddAbortTimes" + UUID.randomUUID(), true)
                 .get().get();
         TransactionBuffer buffer = persistentTopic.getTransactionBuffer();
         Field field = TopicTransactionBuffer.class.getDeclaredField("changeMaxReadPositionAndAddAbortTimes");