You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/11/20 12:20:09 UTC

[pulsar] branch master updated: [Transaction] Support send transaction messages synchronously (#8631)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new a1cf30a  [Transaction] Support send transaction messages synchronously (#8631)
a1cf30a is described below

commit a1cf30a5f49bdbe0def2770207746543bdada7c7
Author: ran <ga...@126.com>
AuthorDate: Fri Nov 20 20:19:53 2020 +0800

    [Transaction] Support send transaction messages synchronously (#8631)
    
    ### Motivation
    
    Currently, sending transaction messages in a sync way is disabled, because the design of the `TransactionBuffer` had some changes(refer to #8347), sending transaction messages in a sync way is allowed.
    
    ### Modifications
    
    Remove the transaction check in the `send` method of the class `TypedMessageBuilderImpl`.
---
 .../pulsar/client/impl/TransactionEndToEndTest.java     |  2 +-
 .../pulsar/client/impl/TypedMessageBuilderImpl.java     | 17 ++++++++++++-----
 2 files changed, 13 insertions(+), 6 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
index 3ee303f..7dbd37f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
@@ -150,7 +150,7 @@ public class TransactionEndToEndTest extends TransactionTestBase {
         int messageCnt = 1000;
         for (int i = 0; i < messageCnt; i++) {
             if (i % 5 == 0) {
-                producer.newMessage(txn1).value(("Hello Txn - " + i).getBytes(UTF_8)).sendAsync();
+                producer.newMessage(txn1).value(("Hello Txn - " + i).getBytes(UTF_8)).send();
                 txn1MessageCnt ++;
             } else {
                 producer.newMessage(txn2).value(("Hello Txn - " + i).getBytes(UTF_8)).sendAsync();
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java
index d05298a..c922b74 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java
@@ -81,12 +81,19 @@ public class TypedMessageBuilderImpl<T> implements TypedMessageBuilder<T> {
 
     @Override
     public MessageId send() throws PulsarClientException {
-        if (null != txn) {
-            // NOTE: it makes no sense to send a transactional message in a blocking way.
-            //       because #send only completes when a transaction is committed or aborted.
-            throw new IllegalStateException("Use sendAsync to send a transactional message");
+        try {
+            // enqueue the message to the buffer
+            CompletableFuture<MessageId> sendFuture = sendAsync();
+
+            if (!sendFuture.isDone()) {
+                // the send request wasn't completed yet (e.g. not failing at enqueuing), then attempt to triggerFlush it out
+                producer.triggerFlush();
+            }
+
+            return sendFuture.get();
+        } catch (Exception e) {
+            throw PulsarClientException.unwrap(e);
         }
-        return producer.send(getMessage());
     }
 
     @Override