You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/11/20 12:20:09 UTC
[pulsar] branch master updated: [Transaction] Support send
transaction messages synchronously (#8631)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new a1cf30a [Transaction] Support send transaction messages synchronously (#8631)
a1cf30a is described below
commit a1cf30a5f49bdbe0def2770207746543bdada7c7
Author: ran <ga...@126.com>
AuthorDate: Fri Nov 20 20:19:53 2020 +0800
[Transaction] Support send transaction messages synchronously (#8631)
### Motivation
Currently, sending transaction messages in a sync way is disabled, because the design of the `TransactionBuffer` had some changes(refer to #8347), sending transaction messages in a sync way is allowed.
### Modifications
Remove the transaction check in the `send` method of the class `TypedMessageBuilderImpl`.
---
.../pulsar/client/impl/TransactionEndToEndTest.java | 2 +-
.../pulsar/client/impl/TypedMessageBuilderImpl.java | 17 ++++++++++++-----
2 files changed, 13 insertions(+), 6 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
index 3ee303f..7dbd37f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
@@ -150,7 +150,7 @@ public class TransactionEndToEndTest extends TransactionTestBase {
int messageCnt = 1000;
for (int i = 0; i < messageCnt; i++) {
if (i % 5 == 0) {
- producer.newMessage(txn1).value(("Hello Txn - " + i).getBytes(UTF_8)).sendAsync();
+ producer.newMessage(txn1).value(("Hello Txn - " + i).getBytes(UTF_8)).send();
txn1MessageCnt ++;
} else {
producer.newMessage(txn2).value(("Hello Txn - " + i).getBytes(UTF_8)).sendAsync();
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java
index d05298a..c922b74 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java
@@ -81,12 +81,19 @@ public class TypedMessageBuilderImpl<T> implements TypedMessageBuilder<T> {
@Override
public MessageId send() throws PulsarClientException {
- if (null != txn) {
- // NOTE: it makes no sense to send a transactional message in a blocking way.
- // because #send only completes when a transaction is committed or aborted.
- throw new IllegalStateException("Use sendAsync to send a transactional message");
+ try {
+ // enqueue the message to the buffer
+ CompletableFuture<MessageId> sendFuture = sendAsync();
+
+ if (!sendFuture.isDone()) {
+ // the send request wasn't completed yet (e.g. not failing at enqueuing), then attempt to triggerFlush it out
+ producer.triggerFlush();
+ }
+
+ return sendFuture.get();
+ } catch (Exception e) {
+ throw PulsarClientException.unwrap(e);
}
- return producer.send(getMessage());
}
@Override