You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2022/06/06 02:48:21 UTC
[rocketmq] branch develop updated: [ISSUE #2435] Solve the problem that DefaultMQProducer#request() sends messages and waits for timeout synchronously (#4313)
This is an automated email from the ASF dual-hosted git repository.
duhengforever pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 1f71c39c5 [ISSUE #2435] Solve the problem that DefaultMQProducer#request() sends messages and waits for timeout synchronously (#4313)
1f71c39c5 is described below
commit 1f71c39c54f6414900e571babe5f5f579ed08078
Author: 李晓双 Li Xiao Shuang <64...@qq.com>
AuthorDate: Mon Jun 6 10:48:16 2022 +0800
[ISSUE #2435] Solve the problem that DefaultMQProducer#request() sends messages and waits for timeout synchronously (#4313)
* Fix message callback timeout bug
* Clean up code format
* Fix message callback timeout bug
Co-authored-by: Heng Du <du...@apache.org>
---
.../client/impl/producer/DefaultMQProducerImpl.java | 16 +++++++---------
.../org/apache/rocketmq/example/rpc/RequestProducer.java | 2 ++
2 files changed, 9 insertions(+), 9 deletions(-)
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
index 3be55ba49..668f9b6b6 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
@@ -480,14 +480,12 @@ public class DefaultMQProducerImpl implements MQProducerInner {
}
/**
- * @deprecated
- * It will be removed at 4.4.0 cause for exception handling and the wrong Semantics of timeout. A new one will be
- * provided in next version
- *
* @param msg
* @param sendCallback
* @param timeout the <code>sendCallback</code> will be invoked at most time
* @throws RejectedExecutionException
+ * @deprecated It will be removed at 4.4.0 cause for exception handling and the wrong Semantics of timeout. A new one will be
+ * provided in next version
*/
@Deprecated
public void send(final Message msg, final SendCallback sendCallback, final long timeout)
@@ -1034,10 +1032,6 @@ public class DefaultMQProducerImpl implements MQProducerInner {
}
/**
- * @deprecated
- * It will be removed at 4.4.0 cause for exception handling and the wrong Semantics of timeout. A new one will be
- * provided in next version
- *
* @param msg
* @param mq
* @param sendCallback
@@ -1045,6 +1039,8 @@ public class DefaultMQProducerImpl implements MQProducerInner {
* @throws MQClientException
* @throws RemotingException
* @throws InterruptedException
+ * @deprecated It will be removed at 4.4.0 cause for exception handling and the wrong Semantics of timeout. A new one will be
+ * provided in next version
*/
@Deprecated
public void send(final Message msg, final MessageQueue mq, final SendCallback sendCallback, final long timeout)
@@ -1367,7 +1363,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
}
- public Message request(Message msg,
+ public Message request(final Message msg,
long timeout) throws RequestTimeoutException, MQClientException, RemotingException, MQBrokerException, InterruptedException {
long beginTimestamp = System.currentTimeMillis();
prepareSendRequest(msg, timeout);
@@ -1382,6 +1378,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
@Override
public void onSuccess(SendResult sendResult) {
requestResponseFuture.setSendRequestOk(true);
+ requestResponseFuture.putResponseMessage(msg);
}
@Override
@@ -1412,6 +1409,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
@Override
public void onSuccess(SendResult sendResult) {
requestResponseFuture.setSendRequestOk(true);
+ requestResponseFuture.executeRequestCallback();
}
@Override
diff --git a/example/src/main/java/org/apache/rocketmq/example/rpc/RequestProducer.java b/example/src/main/java/org/apache/rocketmq/example/rpc/RequestProducer.java
index f200f77da..69048de2d 100644
--- a/example/src/main/java/org/apache/rocketmq/example/rpc/RequestProducer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/rpc/RequestProducer.java
@@ -29,6 +29,8 @@ public class RequestProducer {
long ttl = 3000;
DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
+
+ //You need to set namesrvAddr to the address of the local namesrv
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();