You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xi...@apache.org on 2022/12/05 08:45:35 UTC
[pulsar] branch branch-2.10 updated: [improve][txn] Implementation of Delayed Transaction Messages (#17548)
This is an automated email from the ASF dual-hosted git repository.
xiangying pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new fc71323ec5f [improve][txn] Implementation of Delayed Transaction Messages (#17548)
fc71323ec5f is described below
commit fc71323ec5fff3b7080451c4b5ace5a655bb948f
Author: congbo <39...@users.noreply.github.com>
AuthorDate: Thu Sep 15 10:14:29 2022 +0800
[improve][txn] Implementation of Delayed Transaction Messages (#17548)
link https://github.com/apache/pulsar/pull/17548
### Motivation
now delayed features and transaction messages cannot be used together.
When sending a transaction message with a delayed time and commit this transaction, the message will be immediately received by consumers.
Code, eg.
```
@Test
public void testDelayedTransactionMessages() throws Exception {
String topic = NAMESPACE1 + "/testDelayedTransactionMessages";
@Cleanup
Consumer<String> sharedConsumer = pulsarClient.newConsumer(Schema.STRING)
.topic(topic)
.subscriptionName("shared-sub")
.subscriptionType(SubscriptionType.Shared)
.subscribe();
@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic(topic)
.enableBatching(false)
.create();
Transaction transaction = pulsarClient.newTransaction()
.withTransactionTimeout(10, TimeUnit.SECONDS).build().get();
// send delayed messages
for (int i = 0; i < 10; i++) {
producer.newMessage(transaction)
.value("msg-" + i)
.deliverAfter(5, TimeUnit.SECONDS)
.sendAsync();
}
producer.flush();
transaction.commit().get();
Message<String> msg = sharedConsumer.receive(1, TimeUnit.SECONDS);
// the msg now is not null
assertNull(msg);
}
```
This PR will implement clients to send delayed messages with transactions.
### Modifications
make transaction message can be put in `trackDelayedDelivery` to implement client send delayed messages with the transaction.
It is worth noting that the dispatcher sends transaction messages to consumers and should follow the `MaxReadPosition` change—(something about `MaxReadPosition` https://github.com/streamnative/community/blob/master/rfc/rfcs/0003-transaction-buffer-design.md).
Because of the existence of maxReadPosition, the distribution of transaction messages depends on whether the previous transaction message is completed. This will cause delay time extended, but not shortened
### Verifying this change
add the test
(cherry picked from commit 1246d793e85cedc9b3fd251f23f1b74492d22120)
---
.../broker/service/AbstractBaseDispatcher.java | 4 +-
.../client/impl/TransactionEndToEndTest.java | 61 ++++++++++++++++++++++
2 files changed, 64 insertions(+), 1 deletion(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
index 7f343617eca..3d3f3db970c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
@@ -183,7 +183,9 @@ public abstract class AbstractBaseDispatcher implements Dispatcher {
entry.release();
continue;
}
- } else if (msgMetadata == null || Markers.isServerOnlyMarker(msgMetadata)) {
+ }
+
+ if (msgMetadata == null || Markers.isServerOnlyMarker(msgMetadata)) {
PositionImpl pos = (PositionImpl) entry.getPosition();
// Message metadata was corrupted or the messages was a server-only marker
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
index 7ff26ddd309..b90037bdbdc 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
@@ -32,6 +32,8 @@ import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
import java.util.Collection;
import java.util.Optional;
+import java.util.Set;
+import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.ArrayList;
import java.util.List;
@@ -1276,4 +1278,63 @@ public class TransactionEndToEndTest extends TransactionTestBase {
assertEquals(value1, new String(deadLetterConsumer.receive(3, TimeUnit.SECONDS).getValue()));
assertEquals(value2, new String(deadLetterConsumer.receive(3, TimeUnit.SECONDS).getValue()));
}
+
+ @Test
+ public void testDelayedTransactionMessages() throws Exception {
+ String topic = NAMESPACE1 + "/testDelayedTransactionMessages";
+
+ @Cleanup
+ Consumer<String> failoverConsumer = pulsarClient.newConsumer(Schema.STRING)
+ .topic(topic)
+ .subscriptionName("failover-sub")
+ .subscriptionType(SubscriptionType.Failover)
+ .subscribe();
+
+ @Cleanup
+ Consumer<String> sharedConsumer = pulsarClient.newConsumer(Schema.STRING)
+ .topic(topic)
+ .subscriptionName("shared-sub")
+ .subscriptionType(SubscriptionType.Shared)
+ .subscribe();
+
+ @Cleanup
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+ .topic(topic)
+ .enableBatching(false)
+ .create();
+
+ Transaction transaction = pulsarClient.newTransaction()
+ .withTransactionTimeout(10, TimeUnit.SECONDS).build().get();
+ for (int i = 0; i < 10; i++) {
+ producer.newMessage(transaction)
+ .value("msg-" + i)
+ .deliverAfter(5, TimeUnit.SECONDS)
+ .sendAsync();
+ }
+
+ producer.flush();
+
+ transaction.commit().get();
+
+ // Failover consumer will receive the messages immediately while
+ // the shared consumer will get them after the delay
+ Message<String> msg = sharedConsumer.receive(1, TimeUnit.SECONDS);
+ assertNull(msg);
+
+ for (int i = 0; i < 10; i++) {
+ msg = failoverConsumer.receive(100, TimeUnit.MILLISECONDS);
+ assertEquals(msg.getValue(), "msg-" + i);
+ }
+
+ Set<String> receivedMsgs = new TreeSet<>();
+ for (int i = 0; i < 10; i++) {
+ msg = sharedConsumer.receive(10, TimeUnit.SECONDS);
+ receivedMsgs.add(msg.getValue());
+ }
+
+ assertEquals(receivedMsgs.size(), 10);
+ for (int i = 0; i < 10; i++) {
+ assertTrue(receivedMsgs.contains("msg-" + i));
+ }
+ }
}