You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2022/06/06 02:48:21 UTC

[rocketmq] branch develop updated: [ISSUE #2435] Solve the problem that DefaultMQProducer#request() sends messages and waits for timeout synchronously (#4313)

This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 1f71c39c5 [ISSUE #2435] Solve the problem that DefaultMQProducer#request() sends messages and waits for timeout synchronously (#4313)
1f71c39c5 is described below

commit 1f71c39c54f6414900e571babe5f5f579ed08078
Author: 李晓双 Li Xiao Shuang <64...@qq.com>
AuthorDate: Mon Jun 6 10:48:16 2022 +0800

    [ISSUE #2435] Solve the problem that DefaultMQProducer#request() sends messages and waits for timeout synchronously (#4313)
    
    * Fix message callback timeout bug
    
    * Clean up code format
    
    * Fix message callback timeout bug
    
    Co-authored-by: Heng Du <du...@apache.org>
---
 .../client/impl/producer/DefaultMQProducerImpl.java      | 16 +++++++---------
 .../org/apache/rocketmq/example/rpc/RequestProducer.java |  2 ++
 2 files changed, 9 insertions(+), 9 deletions(-)

diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
index 3be55ba49..668f9b6b6 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
@@ -480,14 +480,12 @@ public class DefaultMQProducerImpl implements MQProducerInner {
     }
 
     /**
-     * @deprecated
-     * It will be removed at 4.4.0 cause for exception handling and the wrong Semantics of timeout. A new one will be
-     * provided in next version
-     *
      * @param msg
      * @param sendCallback
      * @param timeout      the <code>sendCallback</code> will be invoked at most time
      * @throws RejectedExecutionException
+     * @deprecated It will be removed at 4.4.0 cause for exception handling and the wrong Semantics of timeout. A new one will be
+     * provided in next version
      */
     @Deprecated
     public void send(final Message msg, final SendCallback sendCallback, final long timeout)
@@ -1034,10 +1032,6 @@ public class DefaultMQProducerImpl implements MQProducerInner {
     }
 
     /**
-     * @deprecated
-     * It will be removed at 4.4.0 cause for exception handling and the wrong Semantics of timeout. A new one will be
-     * provided in next version
-     *
      * @param msg
      * @param mq
      * @param sendCallback
@@ -1045,6 +1039,8 @@ public class DefaultMQProducerImpl implements MQProducerInner {
      * @throws MQClientException
      * @throws RemotingException
      * @throws InterruptedException
+     * @deprecated It will be removed at 4.4.0 cause for exception handling and the wrong Semantics of timeout. A new one will be
+     * provided in next version
      */
     @Deprecated
     public void send(final Message msg, final MessageQueue mq, final SendCallback sendCallback, final long timeout)
@@ -1367,7 +1363,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
         return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
     }
 
-    public Message request(Message msg,
+    public Message request(final Message msg,
         long timeout) throws RequestTimeoutException, MQClientException, RemotingException, MQBrokerException, InterruptedException {
         long beginTimestamp = System.currentTimeMillis();
         prepareSendRequest(msg, timeout);
@@ -1382,6 +1378,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
                 @Override
                 public void onSuccess(SendResult sendResult) {
                     requestResponseFuture.setSendRequestOk(true);
+                    requestResponseFuture.putResponseMessage(msg);
                 }
 
                 @Override
@@ -1412,6 +1409,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
             @Override
             public void onSuccess(SendResult sendResult) {
                 requestResponseFuture.setSendRequestOk(true);
+                requestResponseFuture.executeRequestCallback();
             }
 
             @Override
diff --git a/example/src/main/java/org/apache/rocketmq/example/rpc/RequestProducer.java b/example/src/main/java/org/apache/rocketmq/example/rpc/RequestProducer.java
index f200f77da..69048de2d 100644
--- a/example/src/main/java/org/apache/rocketmq/example/rpc/RequestProducer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/rpc/RequestProducer.java
@@ -29,6 +29,8 @@ public class RequestProducer {
         long ttl = 3000;
 
         DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
+
+        //You need to set namesrvAddr to the address of the local namesrv
         producer.setNamesrvAddr("127.0.0.1:9876");
 
         producer.start();