You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/09/08 11:43:07 UTC

[GitHub] [pulsar] congbobo184 opened a new pull request, #17548: [improve][txn] Implementation of Delayed Transaction Messages

congbobo184 opened a new pull request, #17548:
URL: https://github.com/apache/pulsar/pull/17548

   ### Motivation
   now delayed features and transaction messages cannot be used together.
   When sending a transaction message with a delayed time and commit this transaction, the message will be immediately received by consumers.
   
   Code, eg.
   ```
       @Test
       public void testDelayedTransactionMessages() throws Exception {
           String topic = NAMESPACE1 + "/testDelayedTransactionMessages";
   
           @Cleanup
           Consumer<String> sharedConsumer = pulsarClient.newConsumer(Schema.STRING)
                   .topic(topic)
                   .subscriptionName("shared-sub")
                   .subscriptionType(SubscriptionType.Shared)
                   .subscribe();
   
           @Cleanup
           Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
                   .topic(topic)
                   .enableBatching(false)
                   .create();
   
           Transaction transaction = pulsarClient.newTransaction()
                   .withTransactionTimeout(10, TimeUnit.SECONDS).build().get();
   
           // send delayed messages
           for (int i = 0; i < 10; i++) {
               producer.newMessage(transaction)
                       .value("msg-" + i)
                       .deliverAfter(5, TimeUnit.SECONDS)
                       .sendAsync();
           }
   
           producer.flush();
   
           transaction.commit().get();
   
           Message<String> msg = sharedConsumer.receive(1, TimeUnit.SECONDS);
           // the msg now is not null
           assertNull(msg);
       }
   ```
   This PR will implement clients to send delayed messages with transactions.
   
   ### Modifications
   make transaction message can be put in `trackDelayedDelivery` to implement client send delayed messages with the transaction.
   
   It is worth noting that the dispatcher sends transaction messages to consumers and should follow the `MaxReadPosition` change—(something about `MaxReadPosition` https://github.com/streamnative/community/blob/master/rfc/rfcs/0003-transaction-buffer-design.md). 
   
   Because of the existence of maxReadPosition, the distribution of transaction messages depends on whether the previous transaction message is completed. This will cause delay time extended, but not shortened
   
   ### Verifying this change
   add the test
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] congbobo184 merged pull request #17548: [improve][txn] Implementation of Delayed Transaction Messages

Posted by GitBox <gi...@apache.org>.
congbobo184 merged PR #17548:
URL: https://github.com/apache/pulsar/pull/17548


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org