You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xi...@apache.org on 2022/12/05 08:45:35 UTC

[pulsar] branch branch-2.10 updated: [improve][txn] Implementation of Delayed Transaction Messages (#17548)

This is an automated email from the ASF dual-hosted git repository.

xiangying pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new fc71323ec5f [improve][txn] Implementation of Delayed Transaction Messages (#17548)
fc71323ec5f is described below

commit fc71323ec5fff3b7080451c4b5ace5a655bb948f
Author: congbo <39...@users.noreply.github.com>
AuthorDate: Thu Sep 15 10:14:29 2022 +0800

    [improve][txn] Implementation of Delayed Transaction Messages (#17548)
    
    link https://github.com/apache/pulsar/pull/17548
    ### Motivation
    now delayed features and transaction messages cannot be used together.
    When sending a transaction message with a delayed time and commit this transaction, the message will be immediately received by consumers.
    
    Code, eg.
    ```
        @Test
        public void testDelayedTransactionMessages() throws Exception {
            String topic = NAMESPACE1 + "/testDelayedTransactionMessages";
    
            @Cleanup
            Consumer<String> sharedConsumer = pulsarClient.newConsumer(Schema.STRING)
                    .topic(topic)
                    .subscriptionName("shared-sub")
                    .subscriptionType(SubscriptionType.Shared)
                    .subscribe();
    
            @Cleanup
            Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
                    .topic(topic)
                    .enableBatching(false)
                    .create();
    
            Transaction transaction = pulsarClient.newTransaction()
                    .withTransactionTimeout(10, TimeUnit.SECONDS).build().get();
    
            // send delayed messages
            for (int i = 0; i < 10; i++) {
                producer.newMessage(transaction)
                        .value("msg-" + i)
                        .deliverAfter(5, TimeUnit.SECONDS)
                        .sendAsync();
            }
    
            producer.flush();
    
            transaction.commit().get();
    
            Message<String> msg = sharedConsumer.receive(1, TimeUnit.SECONDS);
            // the msg now is not null
            assertNull(msg);
        }
    ```
    This PR will implement clients to send delayed messages with transactions.
    
    ### Modifications
    make transaction message can be put in `trackDelayedDelivery` to implement client send delayed messages with the transaction.
    
    It is worth noting that the dispatcher sends transaction messages to consumers and should follow the `MaxReadPosition` change—(something about `MaxReadPosition` https://github.com/streamnative/community/blob/master/rfc/rfcs/0003-transaction-buffer-design.md).
    
    Because of the existence of maxReadPosition, the distribution of transaction messages depends on whether the previous transaction message is completed. This will cause delay time extended, but not shortened
    
    ### Verifying this change
    add the test
    
    (cherry picked from commit 1246d793e85cedc9b3fd251f23f1b74492d22120)
---
 .../broker/service/AbstractBaseDispatcher.java     |  4 +-
 .../client/impl/TransactionEndToEndTest.java       | 61 ++++++++++++++++++++++
 2 files changed, 64 insertions(+), 1 deletion(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
index 7f343617eca..3d3f3db970c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
@@ -183,7 +183,9 @@ public abstract class AbstractBaseDispatcher implements Dispatcher {
                     entry.release();
                     continue;
                 }
-            } else if (msgMetadata == null || Markers.isServerOnlyMarker(msgMetadata)) {
+            }
+
+            if (msgMetadata == null || Markers.isServerOnlyMarker(msgMetadata)) {
                 PositionImpl pos = (PositionImpl) entry.getPosition();
                 // Message metadata was corrupted or the messages was a server-only marker
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
index 7ff26ddd309..b90037bdbdc 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
@@ -32,6 +32,8 @@ import java.lang.reflect.Constructor;
 import java.lang.reflect.Field;
 import java.util.Collection;
 import java.util.Optional;
+import java.util.Set;
+import java.util.TreeSet;
 import java.util.concurrent.CompletableFuture;
 import java.util.ArrayList;
 import java.util.List;
@@ -1276,4 +1278,63 @@ public class TransactionEndToEndTest extends TransactionTestBase {
         assertEquals(value1, new String(deadLetterConsumer.receive(3, TimeUnit.SECONDS).getValue()));
         assertEquals(value2, new String(deadLetterConsumer.receive(3, TimeUnit.SECONDS).getValue()));
     }
+
+    @Test
+    public void testDelayedTransactionMessages() throws Exception {
+        String topic = NAMESPACE1 + "/testDelayedTransactionMessages";
+
+        @Cleanup
+        Consumer<String> failoverConsumer = pulsarClient.newConsumer(Schema.STRING)
+                .topic(topic)
+                .subscriptionName("failover-sub")
+                .subscriptionType(SubscriptionType.Failover)
+                .subscribe();
+
+        @Cleanup
+        Consumer<String> sharedConsumer = pulsarClient.newConsumer(Schema.STRING)
+                .topic(topic)
+                .subscriptionName("shared-sub")
+                .subscriptionType(SubscriptionType.Shared)
+                .subscribe();
+
+        @Cleanup
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic(topic)
+                .enableBatching(false)
+                .create();
+
+        Transaction transaction = pulsarClient.newTransaction()
+                .withTransactionTimeout(10, TimeUnit.SECONDS).build().get();
+        for (int i = 0; i < 10; i++) {
+            producer.newMessage(transaction)
+                    .value("msg-" + i)
+                    .deliverAfter(5, TimeUnit.SECONDS)
+                    .sendAsync();
+        }
+
+        producer.flush();
+
+        transaction.commit().get();
+
+        // Failover consumer will receive the messages immediately while
+        // the shared consumer will get them after the delay
+        Message<String> msg = sharedConsumer.receive(1, TimeUnit.SECONDS);
+        assertNull(msg);
+
+        for (int i = 0; i < 10; i++) {
+            msg = failoverConsumer.receive(100, TimeUnit.MILLISECONDS);
+            assertEquals(msg.getValue(), "msg-" + i);
+        }
+
+        Set<String> receivedMsgs = new TreeSet<>();
+        for (int i = 0; i < 10; i++) {
+            msg = sharedConsumer.receive(10, TimeUnit.SECONDS);
+            receivedMsgs.add(msg.getValue());
+        }
+
+        assertEquals(receivedMsgs.size(), 10);
+        for (int i = 0; i < 10; i++) {
+            assertTrue(receivedMsgs.contains("msg-" + i));
+        }
+    }
 }