You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/02/10 04:04:35 UTC

[pulsar] 12/13: [Transaction] Fix send normal message can't change MaxReadPosition (#14192)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 0b516b8672ee9987f8dac791717f170fd4391ca5
Author: congbo <39...@users.noreply.github.com>
AuthorDate: Thu Feb 10 09:15:19 2022 +0800

    [Transaction] Fix send normal message can't change MaxReadPosition (#14192)
    
    link https://github.com/apache/pulsar/pull/14097
    
    When disable transaction producer connect to broker and the TopicTransactionBuffer is recovering, the TopicTransactionBuffer state is None or Initializing, then send normal message can't change the MaxReadPosition.
    If recover success and then producer don't send message to this topic. The maxReadPosition will not be change and consumer will not receive message when the disable transaction producer sent.
    
    1. recover to Ready state, if no ongoing txns, change maxReadPosition to LAC
    2. recover to NoSnapshot state, change maxReadPosition to LAC
    
    (cherry picked from commit 0287f7f4278c8bf892d869d81d5d1982be39a516)
---
 .../buffer/impl/TopicTransactionBuffer.java        |  38 +++++---
 .../buffer/TransactionStablePositionTest.java      | 100 +++++++++++++++++++--
 2 files changed, 119 insertions(+), 19 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
index 12818b7..89c77d6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
@@ -104,25 +104,43 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen
         this.takeSnapshotIntervalTime = topic.getBrokerService().getPulsar()
                 .getConfiguration().getTransactionBufferSnapshotMinTimeInMillis();
         this.maxReadPosition = (PositionImpl) topic.getManagedLedger().getLastConfirmedEntry();
+        this.recover();
+    }
+
+    private void recover() {
         this.topic.getBrokerService().getPulsar().getTransactionExecutorProvider().getExecutor(this)
                 .execute(new TopicTransactionBufferRecover(new TopicTransactionBufferRecoverCallBack() {
                     @Override
                     public void recoverComplete() {
-                        if (!changeToReadyState()) {
-                            log.error("[{}]Transaction buffer recover fail", topic.getName());
-                        } else {
-                            timer.newTimeout(TopicTransactionBuffer.this,
-                                    takeSnapshotIntervalTime, TimeUnit.MILLISECONDS);
-                            transactionBufferFuture.complete(null);
+                        synchronized (TopicTransactionBuffer.this) {
+                            // sync maxReadPosition change to LAC when TopicTransaction buffer have not recover
+                            // completely the normal message have been sent to broker and state is
+                            // not Ready can't sync maxReadPosition when no ongoing transactions
+                            if (ongoingTxns.isEmpty()) {
+                                maxReadPosition = (PositionImpl) topic.getManagedLedger().getLastConfirmedEntry();
+                            }
+                            if (!changeToReadyState()) {
+                                log.error("[{}]Transaction buffer recover fail", topic.getName());
+                            } else {
+                                timer.newTimeout(TopicTransactionBuffer.this,
+                                        takeSnapshotIntervalTime, TimeUnit.MILLISECONDS);
+                                transactionBufferFuture.complete(null);
+                            }
                         }
                     }
 
                     @Override
                     public void noNeedToRecover() {
-                        if (!changeToNoSnapshotState()) {
-                            log.error("[{}]Transaction buffer recover fail", topic.getName());
-                        } else {
-                            transactionBufferFuture.complete(null);
+                        synchronized (TopicTransactionBuffer.this) {
+                            // sync maxReadPosition change to LAC when TopicTransaction buffer have not recover
+                            // completely the normal message have been sent to broker and state is
+                            // not NoSnapshot can't sync maxReadPosition
+                            maxReadPosition = (PositionImpl) topic.getManagedLedger().getLastConfirmedEntry();
+                            if (!changeToNoSnapshotState()) {
+                                log.error("[{}]Transaction buffer recover fail", topic.getName());
+                            } else {
+                                transactionBufferFuture.complete(null);
+                            }
                         }
                     }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionStablePositionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionStablePositionTest.java
index ef1c761..0184b27 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionStablePositionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionStablePositionTest.java
@@ -18,33 +18,37 @@
  */
 package org.apache.pulsar.broker.transaction.buffer;
 
+import static org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBufferState.State.NoSnapshot;
+import static org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBufferState.State.Ready;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNull;
-
-import com.google.common.collect.Sets;
-
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
 import java.util.concurrent.TimeUnit;
-
+import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
-
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.transaction.TransactionTestBase;
+import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer;
+import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBufferState;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.api.transaction.Transaction;
 import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClient;
+import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
-import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.policies.data.ClusterData;
-import org.apache.pulsar.common.policies.data.ClusterDataImpl;
-import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.awaitility.Awaitility;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 /**
@@ -157,4 +161,82 @@ public class TransactionStablePositionTest extends TransactionTestBase {
         assertNull(message);
     }
 
+    @DataProvider(name = "enableTransactionAndState")
+    public static Object[][] enableTransactionAndState() {
+        return new Object[][] {
+                { true, TopicTransactionBufferState.State.None },
+                { false, TopicTransactionBufferState.State.None },
+                { true, TopicTransactionBufferState.State.Initializing },
+                { false, TopicTransactionBufferState.State.Initializing }
+        };
+    }
+
+    @Test(dataProvider = "enableTransactionAndState")
+    public void testSyncNormalPositionWhenTBRecover(boolean clientEnableTransaction,
+                                                    TopicTransactionBufferState.State state) throws Exception {
+
+        final String topicName = NAMESPACE1 + "/testSyncNormalPositionWhenTBRecover-"
+                + clientEnableTransaction + state.name();
+        pulsarClient = PulsarClient.builder()
+                .serviceUrl(getPulsarServiceList().get(0).getBrokerServiceUrl())
+                .statsInterval(0, TimeUnit.SECONDS)
+                .enableTransaction(clientEnableTransaction)
+                .build();
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
+                .sendTimeout(0, TimeUnit.SECONDS)
+                .topic(topicName)
+                .create();
+
+        PersistentTopic persistentTopic = (PersistentTopic) getPulsarServiceList().get(0).getBrokerService()
+                .getTopic(TopicName.get(topicName).toString(), false).get().get();
+
+        TopicTransactionBuffer topicTransactionBuffer = (TopicTransactionBuffer) persistentTopic.getTransactionBuffer();
+
+        // wait topic transaction buffer recover success
+        checkTopicTransactionBufferState(clientEnableTransaction, topicTransactionBuffer);
+
+        Field field = TopicTransactionBufferState.class.getDeclaredField("state");
+        field.setAccessible(true);
+        field.set(topicTransactionBuffer, state);
+
+        // init maxReadPosition is PositionImpl.EARLIEST
+        Position position = topicTransactionBuffer.getMaxReadPosition();
+        assertEquals(position, PositionImpl.earliest);
+
+        MessageIdImpl messageId = (MessageIdImpl) producer.send("test".getBytes());
+
+        // send normal message can't change MaxReadPosition when state is None or Initializing
+        position = topicTransactionBuffer.getMaxReadPosition();
+        assertEquals(position, PositionImpl.earliest);
+
+        // invoke recover
+        Method method = TopicTransactionBuffer.class.getDeclaredMethod("recover");
+        method.setAccessible(true);
+        method.invoke(topicTransactionBuffer);
+
+        // change to None state can recover
+        field.set(topicTransactionBuffer, TopicTransactionBufferState.State.None);
+
+        // recover success again
+        checkTopicTransactionBufferState(clientEnableTransaction, topicTransactionBuffer);
+
+        // change MaxReadPosition to normal message position
+        assertEquals(PositionImpl.get(messageId.getLedgerId(), messageId.getEntryId()),
+                topicTransactionBuffer.getMaxReadPosition());
+    }
+
+    private void checkTopicTransactionBufferState(boolean clientEnableTransaction,
+                                                  TopicTransactionBuffer topicTransactionBuffer) {
+        // recover success
+        Awaitility.await().until(() -> {
+            if (clientEnableTransaction) {
+                // recover success, client enable transaction will change to Ready State
+                return topicTransactionBuffer.getStats().state.equals(Ready.name());
+            } else {
+                // recover success, client disable transaction will change to NoSnapshot State
+                return topicTransactionBuffer.getStats().state.equals(NoSnapshot.name());
+            }
+        });
+    }
 }