You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/12/20 13:14:00 UTC

[pulsar] 08/15: [Transaction]Fix maxReadPosition with normal publish (#12386)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit ed519055161f637c81349282176cb151a6dafc7a
Author: Xiangying Meng <55...@users.noreply.github.com>
AuthorDate: Wed Nov 3 12:57:06 2021 +0800

    [Transaction]Fix maxReadPosition with normal publish (#12386)
    
    now Transaction buffer syncMaxReadPositionForNormalPublish don't wait recover success. Transaction buffer recover is asynchronous, so we need to wait buffer recover success then syncMaxReadPositionForNormalPublish when producer normal message.
    
    We should not change maxReadPosition before TransactionBuffer recovers completely.
    
    It will change when the state of TB is NoSnapshot or the state of TB is Ready  but the ongoingTxn is empty.
    
    (cherry picked from commit 32c8f865f731f739065ef900126973be258d3801)
---
 .../buffer/impl/TopicTransactionBuffer.java        |  7 +-
 .../broker/transaction/TransactionConsumeTest.java | 17 ++++-
 .../pulsar/broker/transaction/TransactionTest.java | 81 +++++++++++++++++++++-
 3 files changed, 99 insertions(+), 6 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
index 78f9295..84b209c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
@@ -455,9 +455,14 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen
         // when ongoing transaction is empty, proved that lastAddConfirm is can read max position, because callback
         // thread is the same tread, in this time the lastAddConfirm don't content transaction message.
         synchronized (TopicTransactionBuffer.this) {
-            if (ongoingTxns.isEmpty()) {
+            if (checkIfNoSnapshot()) {
                 maxReadPosition = position;
                 changeMaxReadPositionAndAddAbortTimes.incrementAndGet();
+            } else if (checkIfReady()) {
+                if (ongoingTxns.isEmpty()) {
+                    maxReadPosition = position;
+                    changeMaxReadPositionAndAddAbortTimes.incrementAndGet();
+                }
             }
         }
     }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionConsumeTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionConsumeTest.java
index 82fd1d1..d381487 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionConsumeTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionConsumeTest.java
@@ -36,12 +36,16 @@ import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.common.api.proto.MessageIdData;
 import org.apache.pulsar.common.api.proto.MessageMetadata;
 import org.apache.pulsar.common.api.proto.TxnAction;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.ClusterDataImpl;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
@@ -75,6 +79,11 @@ public class TransactionConsumeTest extends TransactionTestBase {
                 new TenantInfoImpl(Sets.newHashSet(), Sets.newHashSet(CLUSTER_NAME)));
         admin.namespaces().createNamespace("public/txn", 10);
         admin.topics().createNonPartitionedTopic(CONSUME_TOPIC);
+
+        admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(),
+                new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet(CLUSTER_NAME)));
+        admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
+        admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(), 1);
     }
 
     @AfterMethod(alwaysRun = true)
@@ -229,7 +238,12 @@ public class TransactionConsumeTest extends TransactionTestBase {
 
     private List<MessageIdData> appendTransactionMessages(
             TxnID txnID, PersistentTopic topic, int transactionMsgCnt, List<String> sendMessageList)
-            throws ExecutionException, InterruptedException {
+            throws ExecutionException, InterruptedException, PulsarClientException {
+        //Change the state of TB to Ready.
+        @Cleanup
+        Producer<String> producer = PulsarClient.builder().serviceUrl(pulsarServiceList.get(0).getBrokerServiceUrl())
+                .enableTransaction(true).build()
+                .newProducer(Schema.STRING).topic(CONSUME_TOPIC).sendTimeout(0, TimeUnit.SECONDS).create();
         List<MessageIdData> positionList = new ArrayList<>();
         for (int i = 0; i < transactionMsgCnt; i++) {
             final int j = i;
@@ -239,7 +253,6 @@ public class TransactionConsumeTest extends TransactionTestBase {
                     .setTxnidMostBits(txnID.getMostSigBits())
                     .setTxnidLeastBits(txnID.getLeastSigBits())
                     .setPublishTime(System.currentTimeMillis());
-
             String msg = TXN_MSG_CONTENT + i;
             sendMessageList.add(msg);
             ByteBuf headerAndPayload = Commands.serializeMetadataAndPayload(
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index 31d8113..a237314 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -26,7 +26,6 @@ import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
-import com.google.common.collect.Sets;
 import io.netty.buffer.Unpooled;
 import java.lang.reflect.Field;
 import java.util.List;
@@ -42,10 +41,13 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer;
+import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBufferState;
 import org.apache.pulsar.broker.transaction.pendingack.PendingAckStore;
 import org.apache.pulsar.broker.transaction.buffer.matadata.TransactionBufferSnapshot;
 import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore;
@@ -68,9 +70,7 @@ import org.apache.pulsar.common.events.EventsTopicNames;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
-import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.common.policies.data.TopicPolicies;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.awaitility.Awaitility;
@@ -409,4 +409,79 @@ public class TransactionTest extends TransactionTestBase {
             return true;
         });
     }
+
+    @Test
+    public void testMaxReadPositionForNormalPublish() throws Exception{
+        String topic = "persistent://" + NAMESPACE1 + "/NormalPublish";
+        admin.topics().createNonPartitionedTopic(topic);
+        PersistentTopic persistentTopic = (PersistentTopic) getPulsarServiceList().get(0).getBrokerService()
+                  .getTopic(topic, false).get().get();
+
+        TopicTransactionBuffer topicTransactionBuffer = (TopicTransactionBuffer) persistentTopic.getTransactionBuffer();
+        PulsarClient noTxnClient = PulsarClient.builder().enableTransaction(false)
+                .serviceUrl(getPulsarServiceList().get(0).getBrokerServiceUrl()).build();
+
+        //test the state of TransactionBuffer is NoSnapshot
+        //before build Producer by pulsarClient that enables transaction.
+        Producer<String> normalProducer = noTxnClient.newProducer(Schema.STRING)
+                .producerName("testNormalPublish")
+                .topic(topic)
+                .sendTimeout(0, TimeUnit.SECONDS)
+                .create();
+        Awaitility.await().untilAsserted(() -> Assert.assertTrue(topicTransactionBuffer.checkIfNoSnapshot()));
+
+        //test publishing normal messages will change maxReadPosition in the state of NoSnapshot.
+        MessageIdImpl messageId = (MessageIdImpl) normalProducer.newMessage().value("normal message").send();
+        PositionImpl position = topicTransactionBuffer.getMaxReadPosition();
+        Assert.assertEquals(position.getLedgerId(), messageId.getLedgerId());
+        Assert.assertEquals(position.getEntryId(), messageId.getEntryId());
+
+        //test the state of TransactionBuffer is Ready after build Producer by pulsarClient that enables transaction.
+        Producer<String> txnProducer = pulsarClient.newProducer(Schema.STRING)
+                .producerName("testTransactionPublish")
+                .topic(topic)
+                .sendTimeout(0, TimeUnit.SECONDS)
+                .create();
+
+        Awaitility.await().untilAsserted(() ->Assert.assertTrue(topicTransactionBuffer.checkIfReady()));
+        //test publishing txn messages will not change maxReadPosition if don`t commit or abort.
+        Transaction transaction = pulsarClient.newTransaction()
+                .withTransactionTimeout(5, TimeUnit.SECONDS).build().get();
+        MessageIdImpl messageId1 = (MessageIdImpl) txnProducer.newMessage(transaction).value("txn message").send();
+        PositionImpl position1 = topicTransactionBuffer.getMaxReadPosition();
+        Assert.assertEquals(position1.getLedgerId(), messageId.getLedgerId());
+        Assert.assertEquals(position1.getEntryId(), messageId.getEntryId());
+
+        MessageIdImpl messageId2 = (MessageIdImpl) normalProducer.newMessage().value("normal message").send();
+        PositionImpl position2 = topicTransactionBuffer.getMaxReadPosition();
+        Assert.assertEquals(position2.getLedgerId(), messageId.getLedgerId());
+        Assert.assertEquals(position2.getEntryId(), messageId.getEntryId());
+        transaction.commit().get();
+        PositionImpl position3 = topicTransactionBuffer.getMaxReadPosition();
+
+        Assert.assertEquals(position3.getLedgerId(), messageId2.getLedgerId());
+        Assert.assertEquals(position3.getEntryId(), messageId2.getEntryId() + 1);
+
+        //test publishing normal messages will change maxReadPosition if the state of TB
+        //is Ready and ongoingTxns is empty.
+        MessageIdImpl messageId4 = (MessageIdImpl) normalProducer.newMessage().value("normal message").send();
+        PositionImpl position4 = topicTransactionBuffer.getMaxReadPosition();
+        Assert.assertEquals(position4.getLedgerId(), messageId4.getLedgerId());
+        Assert.assertEquals(position4.getEntryId(), messageId4.getEntryId());
+
+        //test publishing normal messages will not change maxReadPosition if the state o TB is Initializing.
+        Class<TopicTransactionBufferState> transactionBufferStateClass =
+                (Class<TopicTransactionBufferState>) topicTransactionBuffer.getClass().getSuperclass();
+        Field field = transactionBufferStateClass.getDeclaredField("state");
+        field.setAccessible(true);
+        Class<TopicTransactionBuffer> topicTransactionBufferClass = TopicTransactionBuffer.class;
+        Field maxReadPositionField = topicTransactionBufferClass.getDeclaredField("maxReadPosition");
+        maxReadPositionField.setAccessible(true);
+        field.set(topicTransactionBuffer, TopicTransactionBufferState.State.Initializing);
+        MessageIdImpl messageId5 = (MessageIdImpl) normalProducer.newMessage().value("normal message").send();
+        PositionImpl position5 = (PositionImpl) maxReadPositionField.get(topicTransactionBuffer);
+        Assert.assertEquals(position5.getLedgerId(), messageId4.getLedgerId());
+        Assert.assertEquals(position5.getEntryId(), messageId4.getEntryId());
+
+        }
 }