You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/09/08 11:43:07 UTC
[GitHub] [pulsar] congbobo184 opened a new pull request, #17548: [improve][txn] Implementation of Delayed Transaction Messages
congbobo184 opened a new pull request, #17548:
URL: https://github.com/apache/pulsar/pull/17548
### Motivation
now delayed features and transaction messages cannot be used together.
When sending a transaction message with a delayed time and commit this transaction, the message will be immediately received by consumers.
Code, eg.
```
@Test
public void testDelayedTransactionMessages() throws Exception {
String topic = NAMESPACE1 + "/testDelayedTransactionMessages";
@Cleanup
Consumer<String> sharedConsumer = pulsarClient.newConsumer(Schema.STRING)
.topic(topic)
.subscriptionName("shared-sub")
.subscriptionType(SubscriptionType.Shared)
.subscribe();
@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic(topic)
.enableBatching(false)
.create();
Transaction transaction = pulsarClient.newTransaction()
.withTransactionTimeout(10, TimeUnit.SECONDS).build().get();
// send delayed messages
for (int i = 0; i < 10; i++) {
producer.newMessage(transaction)
.value("msg-" + i)
.deliverAfter(5, TimeUnit.SECONDS)
.sendAsync();
}
producer.flush();
transaction.commit().get();
Message<String> msg = sharedConsumer.receive(1, TimeUnit.SECONDS);
// the msg now is not null
assertNull(msg);
}
```
This PR will implement clients to send delayed messages with transactions.
### Modifications
make transaction message can be put in `trackDelayedDelivery` to implement client send delayed messages with the transaction.
It is worth noting that the dispatcher sends transaction messages to consumers and should follow the `MaxReadPosition` change—(something about `MaxReadPosition` https://github.com/streamnative/community/blob/master/rfc/rfcs/0003-transaction-buffer-design.md).
Because of the existence of maxReadPosition, the distribution of transaction messages depends on whether the previous transaction message is completed. This will cause delay time extended, but not shortened
### Verifying this change
add the test
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar] congbobo184 merged pull request #17548: [improve][txn] Implementation of Delayed Transaction Messages
Posted by GitBox <gi...@apache.org>.
congbobo184 merged PR #17548:
URL: https://github.com/apache/pulsar/pull/17548
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org