You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2019/12/20 08:47:18 UTC

[GitHub] [rocketmq-spring] xiangwangcheng opened a new pull request #209: [ISSUE #208]support request/response model in rocketmq-spring

xiangwangcheng opened a new pull request #209: [ISSUE #208]support request/response model in rocketmq-spring
URL: https://github.com/apache/rocketmq-spring/pull/209
 
 
   ## What is the purpose of the change
   
   add feature from #208 
   
   ## Brief changelog
   add send request message method in RoketMQTemplate including:
   1) in sync model
   2) in sync model and send in orderly
   ) in async model
   4) in aysnc model and send in ordery
   
   and test cases.
   
   ## Verifying this change
   
   XXXX
   
   Follow this checklist to help us incorporate your contribution quickly and easily. Notice, `it would be helpful if you could finish the following 5 checklist(the last one is not necessary)before request the community to review your PR`.
   
   - [√] Make sure there is a [Github issue](https://github.com/apache/rocketmq/issues) filed for the change (usually before you start working on it). Trivial changes like typos do not require a Github issue. Your pull request should address just this issue, without pulling in other changes - one PR resolves one issue. 
   - [√] Format the pull request title like `[ISSUE #123] Fix UnknownException when host config not exist`. Each commit in the pull request should have a meaningful subject line and body.
   - [√] Write a pull request description that is detailed enough to understand what the pull request does, how, and why.
   - [√] Write necessary unit-test(over 80% coverage) to verify your logic correction, more mock a little better when cross module dependency exist. 
   - [√] Run `mvn -B clean apache-rat:check findbugs:findbugs checkstyle:checkstyle` to make sure basic checks pass. Run `mvn clean install -DskipITs` to make sure unit-test pass. Run `mvn clean test-compile failsafe:integration-test`  to make sure integration-test pass.
   - [ ] If this contribution is large, please file an [Apache Individual Contributor License Agreement](http://www.apache.org/licenses/#clas).
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [rocketmq-spring] RongtongJin commented on a change in pull request #209: [ISSUE #208]support request/reply model in rocketmq-spring

Posted by GitBox <gi...@apache.org>.
RongtongJin commented on a change in pull request #209: [ISSUE #208]support request/reply model in rocketmq-spring
URL: https://github.com/apache/rocketmq-spring/pull/209#discussion_r361106844
 
 

 ##########
 File path: rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/ConsumerWithReply.java
 ##########
 @@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.samples.springboot.consumer;
+
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.client.utils.MessageUtil;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
+import org.apache.rocketmq.spring.core.RocketMQListener;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+/**
+ * RocketMQMessageListener
+ */
+@Service
+@RocketMQMessageListener(topic = "${demo.rocketmq.requestTopic}", consumerGroup = "request_consumer", selectorExpression = "${demo.rocketmq.tag}")
+public class ConsumerWithReply implements RocketMQListener<MessageExt> {
+
+    @Autowired
+    private DefaultMQProducer replyProducer;
+
+    @Override
+    public void onMessage(MessageExt message) {
+        System.out.printf("------- StringConsumer received: %s \n", message);
+        try {
+            String replyTo = MessageUtil.getReplyToClient(message);
+            byte[] replyContent = "reply message contents.".getBytes();
+            // create reply message with given util, do not create reply message by yourself
+            Message replyMessage = MessageUtil.createReplyMessage(message, replyContent);
+
+            // send reply message with producer
+            SendResult replyResult = replyProducer.send(replyMessage, 3000);
+            System.out.printf("reply to %s , %s %n", replyTo, replyResult.toString());
+        }catch(MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
+            System.out.println(e.getLocalizedMessage());
+        }
+    }
+}
 
 Review comment:
   If the user use RocketMQListener<String> to receive message, he will not be able to use request-reply model according to current implementation. IMO,  for consumer, we needs to implement a new interface, such as
   ```java
   public interface RocketMQReplyListener<T> {
       Message<?> onMessage(T message);
   }
   ```
   and the job of reply message does not need to be completed by the user.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [rocketmq-spring] RongtongJin commented on a change in pull request #209: [ISSUE #208]support request/reply model in rocketmq-spring

Posted by GitBox <gi...@apache.org>.
RongtongJin commented on a change in pull request #209: [ISSUE #208]support request/reply model in rocketmq-spring
URL: https://github.com/apache/rocketmq-spring/pull/209#discussion_r364544448
 
 

 ##########
 File path: rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/java/org/apache/rocketmq/samples/springboot/ProducerApplication.java
 ##########
 @@ -116,6 +128,53 @@ public void onException(Throwable var1) {
 
         // Send transactional messages using extRocketMQTemplate
         testExtRocketMQTemplateTransaction();
+
+        // send request in sync mode and receive a reply of String type.
+        String replyString = rocketMQTemplate.sendAndReceive(stringRequestTopic, "request string", String.class);
+        System.out.printf("send %s and receive %s %n", "request string", replyString);
+
+        // send request in sync mode with timeout parameter and receive a reply of byte[] type.
+        byte[] replyBytes = rocketMQTemplate.sendAndReceive(bytesRequestTopic, new Message<String>() {
 
 Review comment:
   It would be better to use `MessageBuilder.withPayload("request byte[]").build()`.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [rocketmq-spring] RongtongJin commented on a change in pull request #209: [ISSUE #208]support request/reply model in rocketmq-spring

Posted by GitBox <gi...@apache.org>.
RongtongJin commented on a change in pull request #209: [ISSUE #208]support request/reply model in rocketmq-spring
URL: https://github.com/apache/rocketmq-spring/pull/209#discussion_r361108693
 
 

 ##########
 File path: rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
 ##########
 @@ -76,6 +80,179 @@ public void setMessageQueueSelector(MessageQueueSelector messageQueueSelector) {
         this.messageQueueSelector = messageQueueSelector;
     }
 
+    public org.apache.rocketmq.common.message.Message requestSync(String destination, Message<?> message) {
+        return requestSync(destination, message, producer.getSendMsgTimeout());
+    }
+
+    public org.apache.rocketmq.common.message.Message requestSync(String destination, Object payload) {
+        return requestSync(destination, payload, producer.getSendMsgTimeout());
+    }
+
+    public org.apache.rocketmq.common.message.Message requestSync(String destination, Message<?> message, long timeout) {
+        return requestSync(destination, message, timeout, 0);
+    }
+
+    public org.apache.rocketmq.common.message.Message requestSync(String destination, Object payload, long timeout) {
+        return requestSync(destination, payload, timeout, 0);
+    }
+
+    public org.apache.rocketmq.common.message.Message requestSync(String destination, Message<?> message, long timeout, int delayLevel) {
+        if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
+            log.error("send request message failed. destination:{}, message is null ", destination);
+            throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
+        }
+
+        try {
+            org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage(rocketMQMessageConverter.getMessageConverter(),
+                charset, destination, message);
+            if (delayLevel > 0) {
+                rocketMsg.setDelayTimeLevel(delayLevel);
+            }
+            org.apache.rocketmq.common.message.Message replyMessage;
+            replyMessage = producer.request(rocketMsg, timeout);
+            return replyMessage;
+        } catch (Exception e) {
+            log.error("send request message failed. destination:{}, message:{} ", destination, message);
+            throw new MessagingException(e.getMessage(), e);
+        }
+    }
+
+    public org.apache.rocketmq.common.message.Message requestSync(String destination, Object payload, long timeout, int delayLevel) {
+        Message<?> message = MessageBuilder.withPayload(payload).build();
+        return requestSync(destination, message, timeout, delayLevel);
+    }
+
+    public org.apache.rocketmq.common.message.Message requestSyncOrderly(String destination, Message<?> message, String hashKey) {
+        return requestSyncOrderly(destination, message, hashKey, producer.getSendMsgTimeout());
+    }
+
+    public org.apache.rocketmq.common.message.Message requestSyncOrderly(String destination, Object payload, String hashKey) {
+        return requestSyncOrderly(destination, payload, hashKey, producer.getSendMsgTimeout());
+    }
+
+    public org.apache.rocketmq.common.message.Message requestSyncOrderly(String destination, Message<?> message, String hashKey, long timeout) {
+        return requestSyncOrderly(destination, message, hashKey, timeout, 0);
+    }
+
+    public org.apache.rocketmq.common.message.Message requestSyncOrderly(String destination, Object payload, String hashKey, long timeout) {
+        return requestSyncOrderly(destination, payload, hashKey, timeout, 0);
+    }
+
+    public org.apache.rocketmq.common.message.Message requestSyncOrderly(String destination, Message<?> message, String hashKey, long timeout, int delayLevel) {
+        if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
+            log.error("send request message failed. destination:{}, message is null ", destination);
+            throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
+        }
+
+        try {
+            org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage(rocketMQMessageConverter.getMessageConverter(),
+                charset, destination, message);
+            if (delayLevel > 0) {
+                rocketMsg.setDelayTimeLevel(delayLevel);
+            }
+            org.apache.rocketmq.common.message.Message replyMessage;
+            if (Objects.isNull(hashKey) || hashKey.isEmpty()) {
+                replyMessage = producer.request(rocketMsg, timeout);
+            } else {
+                replyMessage = producer.request(rocketMsg, messageQueueSelector, hashKey, timeout);
+            }
+            return replyMessage;
+        } catch (Exception e) {
+            log.error("send request message failed. destination:{}, message:{} ", destination, message);
+            throw new MessagingException(e.getMessage(), e);
+        }
+    }
+
+    public org.apache.rocketmq.common.message.Message requestSyncOrderly(String destination, Object payload, String hashKey, long timeout, int delayLevel) {
+        Message<?> message = MessageBuilder.withPayload(payload).build();
+        return requestSyncOrderly(destination, message, hashKey, timeout, delayLevel);
+    }
+
+    public void requestAsync(String destination, Message<?> message, RequestCallback requestCallback) {
+        requestAsync(destination, message, requestCallback, producer.getSendMsgTimeout());
+    }
+
+    public void requestAsync(String destination, Object payload, RequestCallback requestCallback) {
+        Message<?> message = MessageBuilder.withPayload(payload).build();
+        requestAsync(destination, message, requestCallback, producer.getSendMsgTimeout());
+    }
+
+    public void requestAsync(String destination, Message<?> message, RequestCallback requestCallback, long timeout) {
+        requestAsync(destination, message, requestCallback, timeout, 0);
+    }
+
+    public void requestAsync(String destination, Object payload, RequestCallback requestCallback, long timeout) {
+        requestAsync(destination, payload, requestCallback, timeout, 0);
+    }
+
+    public void requestAsync(String destination, Message<?> message, RequestCallback requestCallback, long timeout, int delayLevel) {
+        if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
+            log.error("send request message failed. destination:{}, message is null ", destination);
+            throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
+        }
+
+        try {
+            org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage(rocketMQMessageConverter.getMessageConverter(),
+                charset, destination, message);
+            if (delayLevel > 0) {
+                rocketMsg.setDelayTimeLevel(delayLevel);
+            }
+            producer.request(rocketMsg, requestCallback, timeout);
+        } catch (Exception e) {
+            log.error("send request message failed. destination:{}, message:{} ", destination, message);
+            throw new MessagingException(e.getMessage(), e);
+        }
+    }
+
+    public void requestAsync(String destination, Object payload, RequestCallback requestCallback, long timeout, int delayLevel) {
+        Message<?> message = MessageBuilder.withPayload(payload).build();
+        requestAsync(destination, message, requestCallback, producer.getSendMsgTimeout(), delayLevel);
+    }
+
+    public void requestAsyncOrderly(String destination, Message<?> message, RequestCallback requestCallback, String hashKey) {
+        requestAsyncOrderly(destination, message, requestCallback, hashKey, producer.getSendMsgTimeout());
+    }
+
+    public void requestAsyncOrderly(String destination, Object payload, RequestCallback requestCallback, String hashKey) {
+        requestAsyncOrderly(destination, payload, requestCallback, hashKey, producer.getSendMsgTimeout());
+    }
+
+    public void requestAsyncOrderly(String destination, Message<?> message, RequestCallback requestCallback, String hashKey, long timeout) {
+        requestAsyncOrderly(destination, message, requestCallback, hashKey, timeout, 0);
+    }
+
+    public void requestAsyncOrderly(String destination, Object payload, RequestCallback requestCallback, String hashKey, long timeout) {
+        requestAsyncOrderly(destination, payload, requestCallback, hashKey, timeout, 0);
+    }
+
+    public void requestAsyncOrderly(String destination, Message<?> message, RequestCallback requestCallback, String hashKey, long timeout, int delayLevel) {
+        if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
+            log.error("send request message failed. destination:{}, message is null ", destination);
+            throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
+        }
+
+        try {
+            org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage(rocketMQMessageConverter.getMessageConverter(),
+                charset, destination, message);
+            if (delayLevel > 0) {
+                rocketMsg.setDelayTimeLevel(delayLevel);
+            }
+            if (Objects.isNull(hashKey) || hashKey.isEmpty()) {
+                producer.request(rocketMsg, requestCallback, timeout);
+            } else {
+                producer.request(rocketMsg, messageQueueSelector, hashKey, requestCallback, timeout);
+            }
+        } catch (Exception e) {
+            log.error("send request message failed. destination:{}, message:{} ", destination, message);
+            throw new MessagingException(e.getMessage(), e);
+        }
+    }
+
+    public void requestAsyncOrderly(String destination, Object payload, RequestCallback requestCallback, String hashKey, long timeout, int delayLevel) {
+        Message<?> message = MessageBuilder.withPayload(payload).build();
+        requestAsyncOrderly(destination, message, requestCallback, hashKey, producer.getSendMsgTimeout(), delayLevel);
+    }
+
     /**
      * <p> Send message in synchronous mode. This method returns only when the sending procedure totally completes.
      * Reliable synchronous transmission is used in extensive scenes, such as important notification messages, SMS
 
 Review comment:
   IMO, there are too many method names. Keep the same name, such as 'sendAndReceive ' and use different parameters to distinguish different types of methods.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [rocketmq-spring] vongosling merged pull request #209: [ISSUE #208]support request/reply model in rocketmq-spring

Posted by GitBox <gi...@apache.org>.
vongosling merged pull request #209: [ISSUE #208]support request/reply model in rocketmq-spring
URL: https://github.com/apache/rocketmq-spring/pull/209
 
 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [rocketmq-spring] RongtongJin commented on a change in pull request #209: [ISSUE #208]support request/reply model in rocketmq-spring

Posted by GitBox <gi...@apache.org>.
RongtongJin commented on a change in pull request #209: [ISSUE #208]support request/reply model in rocketmq-spring
URL: https://github.com/apache/rocketmq-spring/pull/209#discussion_r364196522
 
 

 ##########
 File path: rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/java/org/apache/rocketmq/samples/springboot/ProducerApplication.java
 ##########
 @@ -238,4 +297,14 @@ public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
         }
     }
 
+    class RocketMQRequestCallbackImpl_User implements RocketMQLocalRequestCallback<User> {
+        @Override public void onSuccess(User message) {
+            System.out.println("receive User: " + message.toString());
+        }
+
+        @Override public void onException(Throwable e) {
+            e.printStackTrace();
+        }
+    }
 
 Review comment:
   Delete useless class

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [rocketmq-spring] RongtongJin commented on a change in pull request #209: [ISSUE #208]support request/reply model in rocketmq-spring

Posted by GitBox <gi...@apache.org>.
RongtongJin commented on a change in pull request #209: [ISSUE #208]support request/reply model in rocketmq-spring
URL: https://github.com/apache/rocketmq-spring/pull/209#discussion_r364545688
 
 

 ##########
 File path: rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/java/org/apache/rocketmq/samples/springboot/ProducerApplication.java
 ##########
 @@ -116,6 +128,53 @@ public void onException(Throwable var1) {
 
         // Send transactional messages using extRocketMQTemplate
         testExtRocketMQTemplateTransaction();
+
+        // send request in sync mode and receive a reply of String type.
+        String replyString = rocketMQTemplate.sendAndReceive(stringRequestTopic, "request string", String.class);
+        System.out.printf("send %s and receive %s %n", "request string", replyString);
+
+        // send request in sync mode with timeout parameter and receive a reply of byte[] type.
+        byte[] replyBytes = rocketMQTemplate.sendAndReceive(bytesRequestTopic, new Message<String>() {
+            @Override public String getPayload() {
+                return "request byte[]";
+            }
+
+            @Override public MessageHeaders getHeaders() {
+                return null;
+            }
+        }, byte[].class, 3000);
+        System.out.printf("send %s and receive %s %n", "request byte[]", replyBytes.toString());
 
 Review comment:
   `new String(replyBytes)` instead of `replyBytes.toString()`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [rocketmq-spring] xiangwangcheng commented on issue #209: [ISSUE #208]support request/reply model in rocketmq-spring

Posted by GitBox <gi...@apache.org>.
xiangwangcheng commented on issue #209: [ISSUE #208]support request/reply model in rocketmq-spring
URL: https://github.com/apache/rocketmq-spring/pull/209#issuecomment-570932230
 
 
   I have made some changes after referring to the implementation of Kafka-spring.
   1. reduce the number of methods
   2. use generic type to receive reply content
   3. add a new interface `RocketMQReplyListener` for consuming and replying.
   4. conceal the reply logic in consumer side.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [rocketmq-spring] xiangwangcheng commented on a change in pull request #209: [ISSUE #208]support request/reply model in rocketmq-spring

Posted by GitBox <gi...@apache.org>.
xiangwangcheng commented on a change in pull request #209: [ISSUE #208]support request/reply model in rocketmq-spring
URL: https://github.com/apache/rocketmq-spring/pull/209#discussion_r361122681
 
 

 ##########
 File path: rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/ConsumerWithReply.java
 ##########
 @@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.samples.springboot.consumer;
+
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.client.utils.MessageUtil;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
+import org.apache.rocketmq.spring.core.RocketMQListener;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+/**
+ * RocketMQMessageListener
+ */
+@Service
+@RocketMQMessageListener(topic = "${demo.rocketmq.requestTopic}", consumerGroup = "request_consumer", selectorExpression = "${demo.rocketmq.tag}")
+public class ConsumerWithReply implements RocketMQListener<MessageExt> {
+
+    @Autowired
+    private DefaultMQProducer replyProducer;
+
+    @Override
+    public void onMessage(MessageExt message) {
+        System.out.printf("------- StringConsumer received: %s \n", message);
+        try {
+            String replyTo = MessageUtil.getReplyToClient(message);
+            byte[] replyContent = "reply message contents.".getBytes();
+            // create reply message with given util, do not create reply message by yourself
+            Message replyMessage = MessageUtil.createReplyMessage(message, replyContent);
+
+            // send reply message with producer
+            SendResult replyResult = replyProducer.send(replyMessage, 3000);
+            System.out.printf("reply to %s , %s %n", replyTo, replyResult.toString());
+        }catch(MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
+            System.out.println(e.getLocalizedMessage());
+        }
+    }
+}
 
 Review comment:
   Good idea! I will think about it.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [rocketmq-spring] zongtanghu commented on a change in pull request #209: [ISSUE #208]support request/response model in rocketmq-spring

Posted by GitBox <gi...@apache.org>.
zongtanghu commented on a change in pull request #209: [ISSUE #208]support request/response model in rocketmq-spring
URL: https://github.com/apache/rocketmq-spring/pull/209#discussion_r360810220
 
 

 ##########
 File path: rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
 ##########
 @@ -76,6 +80,179 @@ public void setMessageQueueSelector(MessageQueueSelector messageQueueSelector) {
         this.messageQueueSelector = messageQueueSelector;
     }
 
+    public org.apache.rocketmq.common.message.Message requestSync(String destination, Message<?> message) {
+        return requestSync(destination, message, producer.getSendMsgTimeout());
+    }
+
+    public org.apache.rocketmq.common.message.Message requestSync(String destination, Object payload) {
+        return requestSync(destination, payload, producer.getSendMsgTimeout());
+    }
+
+    public org.apache.rocketmq.common.message.Message requestSync(String destination, Message<?> message, long timeout) {
+        return requestSync(destination, message, timeout, 0);
+    }
+
+    public org.apache.rocketmq.common.message.Message requestSync(String destination, Object payload, long timeout) {
+        return requestSync(destination, payload, timeout, 0);
+    }
+
+    public org.apache.rocketmq.common.message.Message requestSync(String destination, Message<?> message, long timeout, int delayLevel) {
+        if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
+            log.error("send request message failed. destination:{}, message is null ", destination);
+            throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
+        }
+
+        try {
+            org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage(rocketMQMessageConverter.getMessageConverter(),
+                charset, destination, message);
+            if (delayLevel > 0) {
+                rocketMsg.setDelayTimeLevel(delayLevel);
+            }
+            org.apache.rocketmq.common.message.Message replyMessage;
+            replyMessage = producer.request(rocketMsg, timeout);
+            return replyMessage;
+        } catch (Exception e) {
+            log.error("send request message failed. destination:{}, message:{} ", destination, message);
+            throw new MessagingException(e.getMessage(), e);
+        }
+    }
+
+    public org.apache.rocketmq.common.message.Message requestSync(String destination, Object payload, long timeout, int delayLevel) {
+        Message<?> message = MessageBuilder.withPayload(payload).build();
+        return requestSync(destination, message, timeout, delayLevel);
+    }
+
+    public org.apache.rocketmq.common.message.Message requestSyncOrderly(String destination, Message<?> message, String hashKey) {
+        return requestSyncOrderly(destination, message, hashKey, producer.getSendMsgTimeout());
+    }
+
+    public org.apache.rocketmq.common.message.Message requestSyncOrderly(String destination, Object payload, String hashKey) {
+        return requestSyncOrderly(destination, payload, hashKey, producer.getSendMsgTimeout());
+    }
+
+    public org.apache.rocketmq.common.message.Message requestSyncOrderly(String destination, Message<?> message, String hashKey, long timeout) {
+        return requestSyncOrderly(destination, message, hashKey, timeout, 0);
+    }
+
+    public org.apache.rocketmq.common.message.Message requestSyncOrderly(String destination, Object payload, String hashKey, long timeout) {
+        return requestSyncOrderly(destination, payload, hashKey, timeout, 0);
+    }
+
+    public org.apache.rocketmq.common.message.Message requestSyncOrderly(String destination, Message<?> message, String hashKey, long timeout, int delayLevel) {
+        if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
+            log.error("send request message failed. destination:{}, message is null ", destination);
+            throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
+        }
+
+        try {
+            org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage(rocketMQMessageConverter.getMessageConverter(),
+                charset, destination, message);
+            if (delayLevel > 0) {
+                rocketMsg.setDelayTimeLevel(delayLevel);
+            }
+            org.apache.rocketmq.common.message.Message replyMessage;
 
 Review comment:
   And you can check the same issue at other places.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [rocketmq-spring] RongtongJin commented on a change in pull request #209: [ISSUE #208]support request/reply model in rocketmq-spring

Posted by GitBox <gi...@apache.org>.
RongtongJin commented on a change in pull request #209: [ISSUE #208]support request/reply model in rocketmq-spring
URL: https://github.com/apache/rocketmq-spring/pull/209#discussion_r363573739
 
 

 ##########
 File path: rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/ConsumerWithReplyGeneric.java
 ##########
 @@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.samples.springboot.consumer;
+
+import org.apache.rocketmq.samples.springboot.domain.ProductWithPayload;
+import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
+import org.apache.rocketmq.spring.core.RocketMQReplyListener;
+import org.springframework.stereotype.Service;
+
+/**
+ * RocketMQMessageListener
+ */
 
 Review comment:
   useless comment

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [rocketmq-spring] xiangwangcheng commented on a change in pull request #209: [ISSUE #208]support request/reply model in rocketmq-spring

Posted by GitBox <gi...@apache.org>.
xiangwangcheng commented on a change in pull request #209: [ISSUE #208]support request/reply model in rocketmq-spring
URL: https://github.com/apache/rocketmq-spring/pull/209#discussion_r364212025
 
 

 ##########
 File path: rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/java/org/apache/rocketmq/samples/springboot/ProducerApplication.java
 ##########
 @@ -238,4 +297,14 @@ public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
         }
     }
 
+    class RocketMQRequestCallbackImpl_User implements RocketMQLocalRequestCallback<User> {
+        @Override public void onSuccess(User message) {
+            System.out.println("receive User: " + message.toString());
+        }
+
+        @Override public void onException(Throwable e) {
+            e.printStackTrace();
+        }
+    }
 
 Review comment:
   fixed

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [rocketmq-spring] zongtanghu commented on a change in pull request #209: [ISSUE #208]support request/response model in rocketmq-spring

Posted by GitBox <gi...@apache.org>.
zongtanghu commented on a change in pull request #209: [ISSUE #208]support request/response model in rocketmq-spring
URL: https://github.com/apache/rocketmq-spring/pull/209#discussion_r360811193
 
 

 ##########
 File path: rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
 ##########
 @@ -76,6 +80,179 @@ public void setMessageQueueSelector(MessageQueueSelector messageQueueSelector) {
         this.messageQueueSelector = messageQueueSelector;
     }
 
+    public org.apache.rocketmq.common.message.Message requestSync(String destination, Message<?> message) {
+        return requestSync(destination, message, producer.getSendMsgTimeout());
+    }
+
+    public org.apache.rocketmq.common.message.Message requestSync(String destination, Object payload) {
+        return requestSync(destination, payload, producer.getSendMsgTimeout());
+    }
+
+    public org.apache.rocketmq.common.message.Message requestSync(String destination, Message<?> message, long timeout) {
+        return requestSync(destination, message, timeout, 0);
+    }
+
+    public org.apache.rocketmq.common.message.Message requestSync(String destination, Object payload, long timeout) {
+        return requestSync(destination, payload, timeout, 0);
+    }
+
+    public org.apache.rocketmq.common.message.Message requestSync(String destination, Message<?> message, long timeout, int delayLevel) {
+        if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
+            log.error("send request message failed. destination:{}, message is null ", destination);
+            throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
+        }
+
+        try {
+            org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage(rocketMQMessageConverter.getMessageConverter(),
+                charset, destination, message);
+            if (delayLevel > 0) {
+                rocketMsg.setDelayTimeLevel(delayLevel);
+            }
+            org.apache.rocketmq.common.message.Message replyMessage;
+            replyMessage = producer.request(rocketMsg, timeout);
+            return replyMessage;
+        } catch (Exception e) {
+            log.error("send request message failed. destination:{}, message:{} ", destination, message);
+            throw new MessagingException(e.getMessage(), e);
+        }
+    }
+
+    public org.apache.rocketmq.common.message.Message requestSync(String destination, Object payload, long timeout, int delayLevel) {
+        Message<?> message = MessageBuilder.withPayload(payload).build();
+        return requestSync(destination, message, timeout, delayLevel);
+    }
+
+    public org.apache.rocketmq.common.message.Message requestSyncOrderly(String destination, Message<?> message, String hashKey) {
+        return requestSyncOrderly(destination, message, hashKey, producer.getSendMsgTimeout());
+    }
+
+    public org.apache.rocketmq.common.message.Message requestSyncOrderly(String destination, Object payload, String hashKey) {
+        return requestSyncOrderly(destination, payload, hashKey, producer.getSendMsgTimeout());
+    }
+
+    public org.apache.rocketmq.common.message.Message requestSyncOrderly(String destination, Message<?> message, String hashKey, long timeout) {
+        return requestSyncOrderly(destination, message, hashKey, timeout, 0);
+    }
+
+    public org.apache.rocketmq.common.message.Message requestSyncOrderly(String destination, Object payload, String hashKey, long timeout) {
+        return requestSyncOrderly(destination, payload, hashKey, timeout, 0);
+    }
+
+    public org.apache.rocketmq.common.message.Message requestSyncOrderly(String destination, Message<?> message, String hashKey, long timeout, int delayLevel) {
+        if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
+            log.error("send request message failed. destination:{}, message is null ", destination);
+            throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
+        }
+
+        try {
+            org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage(rocketMQMessageConverter.getMessageConverter(),
+                charset, destination, message);
+            if (delayLevel > 0) {
+                rocketMsg.setDelayTimeLevel(delayLevel);
+            }
+            org.apache.rocketmq.common.message.Message replyMessage;
+            if (Objects.isNull(hashKey) || hashKey.isEmpty()) {
+                replyMessage = producer.request(rocketMsg, timeout);
+            } else {
+                replyMessage = producer.request(rocketMsg, messageQueueSelector, hashKey, timeout);
+            }
+            return replyMessage;
+        } catch (Exception e) {
+            log.error("send request message failed. destination:{}, message:{} ", destination, message);
+            throw new MessagingException(e.getMessage(), e);
+        }
+    }
+
+    public org.apache.rocketmq.common.message.Message requestSyncOrderly(String destination, Object payload, String hashKey, long timeout, int delayLevel) {
+        Message<?> message = MessageBuilder.withPayload(payload).build();
+        return requestSyncOrderly(destination, message, hashKey, timeout, delayLevel);
+    }
+
+    public void requestAsync(String destination, Message<?> message, RequestCallback requestCallback) {
+        requestAsync(destination, message, requestCallback, producer.getSendMsgTimeout());
+    }
+
+    public void requestAsync(String destination, Object payload, RequestCallback requestCallback) {
+        Message<?> message = MessageBuilder.withPayload(payload).build();
+        requestAsync(destination, message, requestCallback, producer.getSendMsgTimeout());
+    }
+
+    public void requestAsync(String destination, Message<?> message, RequestCallback requestCallback, long timeout) {
+        requestAsync(destination, message, requestCallback, timeout, 0);
+    }
+
+    public void requestAsync(String destination, Object payload, RequestCallback requestCallback, long timeout) {
+        requestAsync(destination, payload, requestCallback, timeout, 0);
+    }
+
+    public void requestAsync(String destination, Message<?> message, RequestCallback requestCallback, long timeout, int delayLevel) {
+        if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
+            log.error("send request message failed. destination:{}, message is null ", destination);
+            throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
+        }
+
+        try {
+            org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage(rocketMQMessageConverter.getMessageConverter(),
+                charset, destination, message);
+            if (delayLevel > 0) {
+                rocketMsg.setDelayTimeLevel(delayLevel);
+            }
+            producer.request(rocketMsg, requestCallback, timeout);
+        } catch (Exception e) {
+            log.error("send request message failed. destination:{}, message:{} ", destination, message);
+            throw new MessagingException(e.getMessage(), e);
+        }
+    }
+
+    public void requestAsync(String destination, Object payload, RequestCallback requestCallback, long timeout, int delayLevel) {
+        Message<?> message = MessageBuilder.withPayload(payload).build();
+        requestAsync(destination, message, requestCallback, producer.getSendMsgTimeout(), delayLevel);
+    }
+
+    public void requestAsyncOrderly(String destination, Message<?> message, RequestCallback requestCallback, String hashKey) {
+        requestAsyncOrderly(destination, message, requestCallback, hashKey, producer.getSendMsgTimeout());
+    }
+
+    public void requestAsyncOrderly(String destination, Object payload, RequestCallback requestCallback, String hashKey) {
+        requestAsyncOrderly(destination, payload, requestCallback, hashKey, producer.getSendMsgTimeout());
+    }
+
+    public void requestAsyncOrderly(String destination, Message<?> message, RequestCallback requestCallback, String hashKey, long timeout) {
+        requestAsyncOrderly(destination, message, requestCallback, hashKey, timeout, 0);
+    }
+
+    public void requestAsyncOrderly(String destination, Object payload, RequestCallback requestCallback, String hashKey, long timeout) {
+        requestAsyncOrderly(destination, payload, requestCallback, hashKey, timeout, 0);
+    }
+
+    public void requestAsyncOrderly(String destination, Message<?> message, RequestCallback requestCallback, String hashKey, long timeout, int delayLevel) {
+        if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
+            log.error("send request message failed. destination:{}, message is null ", destination);
+            throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
+        }
+
+        try {
+            org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage(rocketMQMessageConverter.getMessageConverter(),
+                charset, destination, message);
+            if (delayLevel > 0) {
+                rocketMsg.setDelayTimeLevel(delayLevel);
+            }
 
 Review comment:
   In the this method, there is need to check requestCallback is null?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [rocketmq-spring] RongtongJin commented on a change in pull request #209: [ISSUE #208]support request/reply model in rocketmq-spring

Posted by GitBox <gi...@apache.org>.
RongtongJin commented on a change in pull request #209: [ISSUE #208]support request/reply model in rocketmq-spring
URL: https://github.com/apache/rocketmq-spring/pull/209#discussion_r363574525
 
 

 ##########
 File path: rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/ObjectConsumerWithReplyUser.java
 ##########
 @@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.samples.springboot.consumer;
+
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.samples.springboot.domain.User;
+import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
+import org.apache.rocketmq.spring.core.RocketMQReplyListener;
+import org.springframework.stereotype.Service;
+
+/**
+ * Consumer that support request-response model
+ */
 
 Review comment:
   `request-response` change to `request-reply`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [rocketmq-spring] RongtongJin commented on a change in pull request #209: [ISSUE #208]support request/reply model in rocketmq-spring

Posted by GitBox <gi...@apache.org>.
RongtongJin commented on a change in pull request #209: [ISSUE #208]support request/reply model in rocketmq-spring
URL: https://github.com/apache/rocketmq-spring/pull/209#discussion_r363581572
 
 

 ##########
 File path: rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java
 ##########
 @@ -327,6 +342,119 @@ public void setName(String name) {
         this.name = name;
     }
 
+    public class DefaultMessageListenerConcurrently implements MessageListenerConcurrently {
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
+            for (MessageExt messageExt : msgs) {
+                log.debug("received msg: {}", messageExt);
+                try {
+                    long now = System.currentTimeMillis();
+                    handleReplyMessage(messageExt);
+                    long costTime = System.currentTimeMillis() - now;
+                    log.debug("consume {} cost: {} ms", messageExt.getMsgId(), costTime);
+                } catch (Exception e) {
+                    log.warn("consume message failed. messageExt:{}, error:{}", messageExt, e);
+                    context.setDelayLevelWhenNextConsume(delayLevelWhenNextConsume);
+                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
+                }
+            }
+
+            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+        }
+    }
+
+    public class DefaultMessageListenerOrderly implements MessageListenerOrderly {
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
+            for (MessageExt messageExt : msgs) {
+                log.debug("received msg: {}", messageExt);
+                try {
+                    long now = System.currentTimeMillis();
+                    handleReplyMessage(messageExt);
+                    long costTime = System.currentTimeMillis() - now;
+                    log.info("consume {} cost: {} ms", messageExt.getMsgId(), costTime);
+                } catch (Exception e) {
+                    log.warn("consume message failed. messageExt:{}", messageExt, e);
+                    context.setSuspendCurrentQueueTimeMillis(suspendCurrentQueueTimeMillis);
+                    return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
+                }
+            }
+
+            return ConsumeOrderlyStatus.SUCCESS;
+        }
+    }
+
+    private void handleReplyMessage(
 
 Review comment:
   How about change a method name? because it is not just about handling reply message.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [rocketmq-spring] RongtongJin commented on a change in pull request #209: [ISSUE #208]support request/reply model in rocketmq-spring

Posted by GitBox <gi...@apache.org>.
RongtongJin commented on a change in pull request #209: [ISSUE #208]support request/reply model in rocketmq-spring
URL: https://github.com/apache/rocketmq-spring/pull/209#discussion_r364545900
 
 

 ##########
 File path: rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/ObjectConsumerWithReplyUser.java
 ##########
 @@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.samples.springboot.consumer;
+
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.samples.springboot.domain.User;
+import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
+import org.apache.rocketmq.spring.core.RocketMQReplyListener;
+import org.springframework.stereotype.Service;
+
+/**
+ * The consumer that replying Object
+ */
+@Service
+@RocketMQMessageListener(topic = "${demo.rocketmq.objectRequestTopic}", consumerGroup = "${demo.rocketmq.objectRequestConsumer}", selectorExpression = "${demo.rocketmq.tag}")
+public class ObjectConsumerWithReplyUser implements RocketMQReplyListener<User, User> {
+
+    @Override
+    public User onMessage(User user) {
+        System.out.printf("------- ObjectConsumerWithReplyUser received: %s \n", user);
+        User replyUser = new User();
+        replyUser.setUserAge(Byte.valueOf((byte) 10));
 
 Review comment:
   `(byte) 10` instead of `Byte.valueOf((byte) 10)`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [rocketmq-spring] vongosling commented on a change in pull request #209: [ISSUE #208]support request/reply model in rocketmq-spring

Posted by GitBox <gi...@apache.org>.
vongosling commented on a change in pull request #209: [ISSUE #208]support request/reply model in rocketmq-spring
URL: https://github.com/apache/rocketmq-spring/pull/209#discussion_r361248179
 
 

 ##########
 File path: rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/ConsumerWithReply.java
 ##########
 @@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.samples.springboot.consumer;
+
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.client.utils.MessageUtil;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
+import org.apache.rocketmq.spring.core.RocketMQListener;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+/**
+ * RocketMQMessageListener
+ */
+@Service
+@RocketMQMessageListener(topic = "${demo.rocketmq.requestTopic}", consumerGroup = "request_consumer", selectorExpression = "${demo.rocketmq.tag}")
 
 Review comment:
   Is it better off using placeholder instead of the concrete group request_consumer?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [rocketmq-spring] vongosling commented on a change in pull request #209: [ISSUE #208]support request/reply model in rocketmq-spring

Posted by GitBox <gi...@apache.org>.
vongosling commented on a change in pull request #209: [ISSUE #208]support request/reply model in rocketmq-spring
URL: https://github.com/apache/rocketmq-spring/pull/209#discussion_r361248036
 
 

 ##########
 File path: rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/ConsumerWithReply.java
 ##########
 @@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.samples.springboot.consumer;
+
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.client.utils.MessageUtil;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
+import org.apache.rocketmq.spring.core.RocketMQListener;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+/**
+ * RocketMQMessageListener
 
 Review comment:
   Useless comments, Could you add some more concrete comments for this class

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [rocketmq-spring] xiangwangcheng commented on a change in pull request #209: [ISSUE #208]support request/reply model in rocketmq-spring

Posted by GitBox <gi...@apache.org>.
xiangwangcheng commented on a change in pull request #209: [ISSUE #208]support request/reply model in rocketmq-spring
URL: https://github.com/apache/rocketmq-spring/pull/209#discussion_r363598225
 
 

 ##########
 File path: rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java
 ##########
 @@ -327,6 +342,119 @@ public void setName(String name) {
         this.name = name;
     }
 
+    public class DefaultMessageListenerConcurrently implements MessageListenerConcurrently {
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
+            for (MessageExt messageExt : msgs) {
+                log.debug("received msg: {}", messageExt);
+                try {
+                    long now = System.currentTimeMillis();
+                    handleReplyMessage(messageExt);
+                    long costTime = System.currentTimeMillis() - now;
+                    log.debug("consume {} cost: {} ms", messageExt.getMsgId(), costTime);
+                } catch (Exception e) {
+                    log.warn("consume message failed. messageExt:{}, error:{}", messageExt, e);
+                    context.setDelayLevelWhenNextConsume(delayLevelWhenNextConsume);
+                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
+                }
+            }
+
+            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+        }
+    }
+
+    public class DefaultMessageListenerOrderly implements MessageListenerOrderly {
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
+            for (MessageExt messageExt : msgs) {
+                log.debug("received msg: {}", messageExt);
+                try {
+                    long now = System.currentTimeMillis();
+                    handleReplyMessage(messageExt);
+                    long costTime = System.currentTimeMillis() - now;
+                    log.info("consume {} cost: {} ms", messageExt.getMsgId(), costTime);
+                } catch (Exception e) {
+                    log.warn("consume message failed. messageExt:{}", messageExt, e);
+                    context.setSuspendCurrentQueueTimeMillis(suspendCurrentQueueTimeMillis);
+                    return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
+                }
+            }
+
+            return ConsumeOrderlyStatus.SUCCESS;
+        }
+    }
+
+    private void handleReplyMessage(
+        MessageExt messageExt) throws MQClientException, RemotingException, InterruptedException {
+        if (rocketMQListener != null) {
+            rocketMQListener.onMessage(doConvertMessage(messageExt));
+        } else if (rocketMQReplyListener != null) {
+            Object replyContent = rocketMQReplyListener.onMessage(doConvertMessage(messageExt));
+            Message<?> message = MessageBuilder.withPayload(replyContent).build();
+
+            org.apache.rocketmq.common.message.Message replyMessage = MessageUtil.createReplyMessage(messageExt, convertToBytes(message));
+            consumer.getDefaultMQPushConsumerImpl().getmQClientFactory().getDefaultMQProducer().send(replyMessage, new SendCallback() {
+                @Override public void onSuccess(SendResult sendResult) {
+                    if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
+                        log.error("Consumer replys message failed. SendStatus: {}", sendResult.getSendStatus());
+                    } else {
+                        log.info("Consumer replys message success.");
+                    }
+                }
+
+                @Override public void onException(Throwable e) {
+                    log.error("Consumer replys message failed. error: {}", e.getLocalizedMessage());
 
 Review comment:
   Is `replies` correct?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [rocketmq-spring] xiangwangcheng commented on a change in pull request #209: [ISSUE #208]support request/reply model in rocketmq-spring

Posted by GitBox <gi...@apache.org>.
xiangwangcheng commented on a change in pull request #209: [ISSUE #208]support request/reply model in rocketmq-spring
URL: https://github.com/apache/rocketmq-spring/pull/209#discussion_r365110246
 
 

 ##########
 File path: rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/ObjectConsumerWithReplyUser.java
 ##########
 @@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.samples.springboot.consumer;
+
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.samples.springboot.domain.User;
+import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
+import org.apache.rocketmq.spring.core.RocketMQReplyListener;
+import org.springframework.stereotype.Service;
+
+/**
+ * The consumer that replying Object
+ */
+@Service
+@RocketMQMessageListener(topic = "${demo.rocketmq.objectRequestTopic}", consumerGroup = "${demo.rocketmq.objectRequestConsumer}", selectorExpression = "${demo.rocketmq.tag}")
+public class ObjectConsumerWithReplyUser implements RocketMQReplyListener<User, User> {
+
+    @Override
+    public User onMessage(User user) {
+        System.out.printf("------- ObjectConsumerWithReplyUser received: %s \n", user);
+        User replyUser = new User();
+        replyUser.setUserAge(Byte.valueOf((byte) 10));
 
 Review comment:
   @RongtongJin  all fixed

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [rocketmq-spring] RongtongJin commented on a change in pull request #209: [ISSUE #208]support request/reply model in rocketmq-spring

Posted by GitBox <gi...@apache.org>.
RongtongJin commented on a change in pull request #209: [ISSUE #208]support request/reply model in rocketmq-spring
URL: https://github.com/apache/rocketmq-spring/pull/209#discussion_r363580022
 
 

 ##########
 File path: rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQReplyListener.java
 ##########
 @@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.spring.core;
+
+/**
+ * The consumer supported request-response model should implement this interface.
+ *
+ * @param <T> the type received by the listener
 
 Review comment:
   I think we should also comment param \<R\> .

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [rocketmq-spring] zongtanghu commented on issue #209: [ISSUE #208]support request/response model in rocketmq-spring

Posted by GitBox <gi...@apache.org>.
zongtanghu commented on issue #209: [ISSUE #208]support request/response model in rocketmq-spring
URL: https://github.com/apache/rocketmq-spring/pull/209#issuecomment-568404139
 
 
   please add some test cases to improve some code coverage, thanks.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [rocketmq-spring] RongtongJin commented on a change in pull request #209: [ISSUE #208]support request/reply model in rocketmq-spring

Posted by GitBox <gi...@apache.org>.
RongtongJin commented on a change in pull request #209: [ISSUE #208]support request/reply model in rocketmq-spring
URL: https://github.com/apache/rocketmq-spring/pull/209#discussion_r363581165
 
 

 ##########
 File path: rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/java/org/apache/rocketmq/samples/springboot/ProducerApplication.java
 ##########
 @@ -116,6 +128,44 @@ public void onException(Throwable var1) {
 
         // Send transactional messages using extRocketMQTemplate
         testExtRocketMQTemplateTransaction();
+
+        // send request in sync mode and receive a reply of String type.
+        String replyString = rocketMQTemplate.sendAndReceive(stringRequestTopic, "request string", String.class);
+        System.out.printf("send %s and receive %s %n", "request string", replyString);
+
+        // send request in sync mode with timeout parameter and receive a reply of byte[] type.
+        byte[] replyBytes = rocketMQTemplate.sendAndReceive(bytesRequestTopic, new Message<String>() {
+            @Override public String getPayload() {
+                return "request byte[]";
+            }
+
+            @Override public MessageHeaders getHeaders() {
+                return null;
+            }
+        }, byte[].class, 3000);
+        System.out.printf("send %s and receive %s %n", "request byte[]", replyBytes.toString());
+
+        // send request in sync mode with hashKey parameter and receive a reply of User type.
+        User requestUser = new User().setUserAge(Byte.valueOf((byte) 9)).setUserName("requestUserName");
+        User replyUser = rocketMQTemplate.sendAndReceive(objectRequestTopic, requestUser, User.class, "order-id");
+        System.out.printf("send %s and receive %s %n", requestUser, replyUser);
+        // send request in sync mode with timeout and delayLevel parameter parameter and receive a reply of generic type.
+        ProductWithPayload<String> replyGenericObject = rocketMQTemplate.sendAndReceive(genericRequestTopic, "request generic",
+            new TypeReference<ProductWithPayload<String>>() {
+            }.getType(), 30000, 2);
+        System.out.printf("send %s and receive %s %n", "request generic", replyGenericObject);
+
+        // send request in async mode and receive a reply of String type.
+        rocketMQTemplate.sendAndReceive(stringRequestTopic, "request string", new RequestCallback() {
+            @Override public void onSuccess(org.apache.rocketmq.common.message.Message message) {
+                System.out.print("receive reply content in callback: " + message.toString());
+            }
+
+            @Override public void onException(Throwable e) {
+                e.printStackTrace();
+            }
+        });
 
 Review comment:
   [Important] we need to switch RequestCallback, because rocketmq message expose to users.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [rocketmq-spring] RongtongJin commented on a change in pull request #209: [ISSUE #208]support request/reply model in rocketmq-spring

Posted by GitBox <gi...@apache.org>.
RongtongJin commented on a change in pull request #209: [ISSUE #208]support request/reply model in rocketmq-spring
URL: https://github.com/apache/rocketmq-spring/pull/209#discussion_r363575118
 
 

 ##########
 File path: rocketmq-spring-boot-samples/rocketmq-consume-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/StringConsumerWithReplyString.java
 ##########
 @@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.samples.springboot.consumer;
+
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
+import org.apache.rocketmq.spring.core.RocketMQReplyListener;
+import org.springframework.stereotype.Service;
+
+/**
+ * Consumer that support request-response model
+ */
 
 Review comment:
   Same as above

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [rocketmq-spring] xiangwangcheng commented on a change in pull request #209: [ISSUE #208]support request/reply model in rocketmq-spring

Posted by GitBox <gi...@apache.org>.
xiangwangcheng commented on a change in pull request #209: [ISSUE #208]support request/reply model in rocketmq-spring
URL: https://github.com/apache/rocketmq-spring/pull/209#discussion_r361106425
 
 

 ##########
 File path: rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
 ##########
 @@ -76,6 +80,179 @@ public void setMessageQueueSelector(MessageQueueSelector messageQueueSelector) {
         this.messageQueueSelector = messageQueueSelector;
     }
 
+    public org.apache.rocketmq.common.message.Message requestSync(String destination, Message<?> message) {
+        return requestSync(destination, message, producer.getSendMsgTimeout());
+    }
+
+    public org.apache.rocketmq.common.message.Message requestSync(String destination, Object payload) {
+        return requestSync(destination, payload, producer.getSendMsgTimeout());
+    }
+
+    public org.apache.rocketmq.common.message.Message requestSync(String destination, Message<?> message, long timeout) {
+        return requestSync(destination, message, timeout, 0);
+    }
+
+    public org.apache.rocketmq.common.message.Message requestSync(String destination, Object payload, long timeout) {
+        return requestSync(destination, payload, timeout, 0);
+    }
+
+    public org.apache.rocketmq.common.message.Message requestSync(String destination, Message<?> message, long timeout, int delayLevel) {
+        if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
+            log.error("send request message failed. destination:{}, message is null ", destination);
+            throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
+        }
+
+        try {
+            org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage(rocketMQMessageConverter.getMessageConverter(),
+                charset, destination, message);
+            if (delayLevel > 0) {
+                rocketMsg.setDelayTimeLevel(delayLevel);
+            }
+            org.apache.rocketmq.common.message.Message replyMessage;
+            replyMessage = producer.request(rocketMsg, timeout);
+            return replyMessage;
+        } catch (Exception e) {
+            log.error("send request message failed. destination:{}, message:{} ", destination, message);
+            throw new MessagingException(e.getMessage(), e);
+        }
+    }
+
+    public org.apache.rocketmq.common.message.Message requestSync(String destination, Object payload, long timeout, int delayLevel) {
+        Message<?> message = MessageBuilder.withPayload(payload).build();
+        return requestSync(destination, message, timeout, delayLevel);
+    }
+
+    public org.apache.rocketmq.common.message.Message requestSyncOrderly(String destination, Message<?> message, String hashKey) {
+        return requestSyncOrderly(destination, message, hashKey, producer.getSendMsgTimeout());
+    }
+
+    public org.apache.rocketmq.common.message.Message requestSyncOrderly(String destination, Object payload, String hashKey) {
+        return requestSyncOrderly(destination, payload, hashKey, producer.getSendMsgTimeout());
+    }
+
+    public org.apache.rocketmq.common.message.Message requestSyncOrderly(String destination, Message<?> message, String hashKey, long timeout) {
+        return requestSyncOrderly(destination, message, hashKey, timeout, 0);
+    }
+
+    public org.apache.rocketmq.common.message.Message requestSyncOrderly(String destination, Object payload, String hashKey, long timeout) {
+        return requestSyncOrderly(destination, payload, hashKey, timeout, 0);
+    }
+
+    public org.apache.rocketmq.common.message.Message requestSyncOrderly(String destination, Message<?> message, String hashKey, long timeout, int delayLevel) {
+        if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
+            log.error("send request message failed. destination:{}, message is null ", destination);
+            throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
+        }
+
+        try {
+            org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage(rocketMQMessageConverter.getMessageConverter(),
+                charset, destination, message);
+            if (delayLevel > 0) {
+                rocketMsg.setDelayTimeLevel(delayLevel);
+            }
+            org.apache.rocketmq.common.message.Message replyMessage;
+            if (Objects.isNull(hashKey) || hashKey.isEmpty()) {
+                replyMessage = producer.request(rocketMsg, timeout);
+            } else {
+                replyMessage = producer.request(rocketMsg, messageQueueSelector, hashKey, timeout);
+            }
+            return replyMessage;
+        } catch (Exception e) {
+            log.error("send request message failed. destination:{}, message:{} ", destination, message);
+            throw new MessagingException(e.getMessage(), e);
+        }
+    }
+
+    public org.apache.rocketmq.common.message.Message requestSyncOrderly(String destination, Object payload, String hashKey, long timeout, int delayLevel) {
+        Message<?> message = MessageBuilder.withPayload(payload).build();
+        return requestSyncOrderly(destination, message, hashKey, timeout, delayLevel);
+    }
+
+    public void requestAsync(String destination, Message<?> message, RequestCallback requestCallback) {
+        requestAsync(destination, message, requestCallback, producer.getSendMsgTimeout());
+    }
+
+    public void requestAsync(String destination, Object payload, RequestCallback requestCallback) {
+        Message<?> message = MessageBuilder.withPayload(payload).build();
+        requestAsync(destination, message, requestCallback, producer.getSendMsgTimeout());
+    }
+
+    public void requestAsync(String destination, Message<?> message, RequestCallback requestCallback, long timeout) {
+        requestAsync(destination, message, requestCallback, timeout, 0);
+    }
+
+    public void requestAsync(String destination, Object payload, RequestCallback requestCallback, long timeout) {
+        requestAsync(destination, payload, requestCallback, timeout, 0);
+    }
+
+    public void requestAsync(String destination, Message<?> message, RequestCallback requestCallback, long timeout, int delayLevel) {
+        if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
+            log.error("send request message failed. destination:{}, message is null ", destination);
+            throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
+        }
+
+        try {
+            org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage(rocketMQMessageConverter.getMessageConverter(),
+                charset, destination, message);
+            if (delayLevel > 0) {
+                rocketMsg.setDelayTimeLevel(delayLevel);
+            }
+            producer.request(rocketMsg, requestCallback, timeout);
+        } catch (Exception e) {
+            log.error("send request message failed. destination:{}, message:{} ", destination, message);
+            throw new MessagingException(e.getMessage(), e);
+        }
+    }
+
+    public void requestAsync(String destination, Object payload, RequestCallback requestCallback, long timeout, int delayLevel) {
+        Message<?> message = MessageBuilder.withPayload(payload).build();
+        requestAsync(destination, message, requestCallback, producer.getSendMsgTimeout(), delayLevel);
+    }
+
+    public void requestAsyncOrderly(String destination, Message<?> message, RequestCallback requestCallback, String hashKey) {
+        requestAsyncOrderly(destination, message, requestCallback, hashKey, producer.getSendMsgTimeout());
+    }
+
+    public void requestAsyncOrderly(String destination, Object payload, RequestCallback requestCallback, String hashKey) {
+        requestAsyncOrderly(destination, payload, requestCallback, hashKey, producer.getSendMsgTimeout());
+    }
+
+    public void requestAsyncOrderly(String destination, Message<?> message, RequestCallback requestCallback, String hashKey, long timeout) {
+        requestAsyncOrderly(destination, message, requestCallback, hashKey, timeout, 0);
+    }
+
+    public void requestAsyncOrderly(String destination, Object payload, RequestCallback requestCallback, String hashKey, long timeout) {
+        requestAsyncOrderly(destination, payload, requestCallback, hashKey, timeout, 0);
+    }
+
+    public void requestAsyncOrderly(String destination, Message<?> message, RequestCallback requestCallback, String hashKey, long timeout, int delayLevel) {
+        if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
+            log.error("send request message failed. destination:{}, message is null ", destination);
+            throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
+        }
+
+        try {
+            org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage(rocketMQMessageConverter.getMessageConverter(),
+                charset, destination, message);
+            if (delayLevel > 0) {
+                rocketMsg.setDelayTimeLevel(delayLevel);
+            }
 
 Review comment:
   I think null is allowed here because we may have the situation that do nothing when response comes back.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [rocketmq-spring] RongtongJin commented on a change in pull request #209: [ISSUE #208]support request/reply model in rocketmq-spring

Posted by GitBox <gi...@apache.org>.
RongtongJin commented on a change in pull request #209: [ISSUE #208]support request/reply model in rocketmq-spring
URL: https://github.com/apache/rocketmq-spring/pull/209#discussion_r364193567
 
 

 ##########
 File path: rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/java/org/apache/rocketmq/samples/springboot/ProducerApplication.java
 ##########
 @@ -116,6 +128,53 @@ public void onException(Throwable var1) {
 
         // Send transactional messages using extRocketMQTemplate
         testExtRocketMQTemplateTransaction();
+
+        // send request in sync mode and receive a reply of String type.
+        String replyString = rocketMQTemplate.sendAndReceive(stringRequestTopic, "request string", String.class);
+        System.out.printf("send %s and receive %s %n", "request string", replyString);
+
+        // send request in sync mode with timeout parameter and receive a reply of byte[] type.
+        byte[] replyBytes = rocketMQTemplate.sendAndReceive(bytesRequestTopic, new Message<String>() {
+            @Override public String getPayload() {
+                return "request byte[]";
+            }
+
+            @Override public MessageHeaders getHeaders() {
+                return null;
+            }
+        }, byte[].class, 3000);
+        System.out.printf("send %s and receive %s %n", "request byte[]", replyBytes.toString());
+
+        // send request in sync mode with hashKey parameter and receive a reply of User type.
+        User requestUser = new User().setUserAge(Byte.valueOf((byte) 9)).setUserName("requestUserName");
 
 Review comment:
   Byte.valueOf((byte) 9) --> (byte) 9

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [rocketmq-spring] RongtongJin commented on a change in pull request #209: [ISSUE #208]support request/reply model in rocketmq-spring

Posted by GitBox <gi...@apache.org>.
RongtongJin commented on a change in pull request #209: [ISSUE #208]support request/reply model in rocketmq-spring
URL: https://github.com/apache/rocketmq-spring/pull/209#discussion_r363570424
 
 

 ##########
 File path: rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java
 ##########
 @@ -327,6 +342,119 @@ public void setName(String name) {
         this.name = name;
     }
 
+    public class DefaultMessageListenerConcurrently implements MessageListenerConcurrently {
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
+            for (MessageExt messageExt : msgs) {
+                log.debug("received msg: {}", messageExt);
+                try {
+                    long now = System.currentTimeMillis();
+                    handleReplyMessage(messageExt);
+                    long costTime = System.currentTimeMillis() - now;
+                    log.debug("consume {} cost: {} ms", messageExt.getMsgId(), costTime);
+                } catch (Exception e) {
+                    log.warn("consume message failed. messageExt:{}, error:{}", messageExt, e);
+                    context.setDelayLevelWhenNextConsume(delayLevelWhenNextConsume);
+                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
+                }
+            }
+
+            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+        }
+    }
+
+    public class DefaultMessageListenerOrderly implements MessageListenerOrderly {
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
+            for (MessageExt messageExt : msgs) {
+                log.debug("received msg: {}", messageExt);
+                try {
+                    long now = System.currentTimeMillis();
+                    handleReplyMessage(messageExt);
+                    long costTime = System.currentTimeMillis() - now;
+                    log.info("consume {} cost: {} ms", messageExt.getMsgId(), costTime);
+                } catch (Exception e) {
+                    log.warn("consume message failed. messageExt:{}", messageExt, e);
+                    context.setSuspendCurrentQueueTimeMillis(suspendCurrentQueueTimeMillis);
+                    return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
+                }
+            }
+
+            return ConsumeOrderlyStatus.SUCCESS;
+        }
+    }
+
+    private void handleReplyMessage(
+        MessageExt messageExt) throws MQClientException, RemotingException, InterruptedException {
+        if (rocketMQListener != null) {
+            rocketMQListener.onMessage(doConvertMessage(messageExt));
+        } else if (rocketMQReplyListener != null) {
+            Object replyContent = rocketMQReplyListener.onMessage(doConvertMessage(messageExt));
+            Message<?> message = MessageBuilder.withPayload(replyContent).build();
+
+            org.apache.rocketmq.common.message.Message replyMessage = MessageUtil.createReplyMessage(messageExt, convertToBytes(message));
+            consumer.getDefaultMQPushConsumerImpl().getmQClientFactory().getDefaultMQProducer().send(replyMessage, new SendCallback() {
+                @Override public void onSuccess(SendResult sendResult) {
+                    if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
+                        log.error("Consumer replys message failed. SendStatus: {}", sendResult.getSendStatus());
+                    } else {
+                        log.info("Consumer replys message success.");
+                    }
+                }
+
+                @Override public void onException(Throwable e) {
+                    log.error("Consumer replys message failed. error: {}", e.getLocalizedMessage());
 
 Review comment:
   Consumer replys message -> Consumer reply message

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [rocketmq-spring] RongtongJin commented on a change in pull request #209: [ISSUE #208]support request/reply model in rocketmq-spring

Posted by GitBox <gi...@apache.org>.
RongtongJin commented on a change in pull request #209: [ISSUE #208]support request/reply model in rocketmq-spring
URL: https://github.com/apache/rocketmq-spring/pull/209#discussion_r363580603
 
 

 ##########
 File path: rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
 ##########
 @@ -537,4 +877,49 @@ public TransactionSendResult sendMessageInTransaction(final String destination,
             destination, msg);
     }
 
+    private Message<?> validateMessageAndPayload(Message<?> message, Object payload) {
+        if (Objects.nonNull(message) && Objects.nonNull(payload)) {
+            log.error("`message` and `payload` cannot exist at the same time.");
+            throw new IllegalArgumentException("`message` and `payload` cannot exist at the same time");
+        }
+
+        if (Objects.nonNull(payload)) {
+            message = MessageBuilder.withPayload(payload).build();
+        }
+
+        if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
+            log.error("send request message failed. Either `message` or `payload` needed.");
+            throw new IllegalArgumentException("either `message` or `payload` needed.");
+        }
+        return message;
+    }
 
 Review comment:
   Delete useless method

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [rocketmq-spring] xiangwangcheng commented on a change in pull request #209: [ISSUE #208]support request/reply model in rocketmq-spring

Posted by GitBox <gi...@apache.org>.
xiangwangcheng commented on a change in pull request #209: [ISSUE #208]support request/reply model in rocketmq-spring
URL: https://github.com/apache/rocketmq-spring/pull/209#discussion_r363598480
 
 

 ##########
 File path: rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/java/org/apache/rocketmq/samples/springboot/ProducerApplication.java
 ##########
 @@ -116,6 +128,44 @@ public void onException(Throwable var1) {
 
         // Send transactional messages using extRocketMQTemplate
         testExtRocketMQTemplateTransaction();
+
+        // send request in sync mode and receive a reply of String type.
+        String replyString = rocketMQTemplate.sendAndReceive(stringRequestTopic, "request string", String.class);
+        System.out.printf("send %s and receive %s %n", "request string", replyString);
+
+        // send request in sync mode with timeout parameter and receive a reply of byte[] type.
+        byte[] replyBytes = rocketMQTemplate.sendAndReceive(bytesRequestTopic, new Message<String>() {
+            @Override public String getPayload() {
+                return "request byte[]";
+            }
+
+            @Override public MessageHeaders getHeaders() {
+                return null;
+            }
+        }, byte[].class, 3000);
+        System.out.printf("send %s and receive %s %n", "request byte[]", replyBytes.toString());
+
+        // send request in sync mode with hashKey parameter and receive a reply of User type.
+        User requestUser = new User().setUserAge(Byte.valueOf((byte) 9)).setUserName("requestUserName");
+        User replyUser = rocketMQTemplate.sendAndReceive(objectRequestTopic, requestUser, User.class, "order-id");
+        System.out.printf("send %s and receive %s %n", requestUser, replyUser);
+        // send request in sync mode with timeout and delayLevel parameter parameter and receive a reply of generic type.
+        ProductWithPayload<String> replyGenericObject = rocketMQTemplate.sendAndReceive(genericRequestTopic, "request generic",
+            new TypeReference<ProductWithPayload<String>>() {
+            }.getType(), 30000, 2);
+        System.out.printf("send %s and receive %s %n", "request generic", replyGenericObject);
+
+        // send request in async mode and receive a reply of String type.
+        rocketMQTemplate.sendAndReceive(stringRequestTopic, "request string", new RequestCallback() {
+            @Override public void onSuccess(org.apache.rocketmq.common.message.Message message) {
+                System.out.print("receive reply content in callback: " + message.toString());
+            }
+
+            @Override public void onException(Throwable e) {
+                e.printStackTrace();
+            }
+        });
 
 Review comment:
   Thanks for these comments, I will consider that and make another commit.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [rocketmq-spring] zongtanghu commented on a change in pull request #209: [ISSUE #208]support request/response model in rocketmq-spring

Posted by GitBox <gi...@apache.org>.
zongtanghu commented on a change in pull request #209: [ISSUE #208]support request/response model in rocketmq-spring
URL: https://github.com/apache/rocketmq-spring/pull/209#discussion_r360808781
 
 

 ##########
 File path: rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
 ##########
 @@ -76,6 +80,179 @@ public void setMessageQueueSelector(MessageQueueSelector messageQueueSelector) {
         this.messageQueueSelector = messageQueueSelector;
     }
 
+    public org.apache.rocketmq.common.message.Message requestSync(String destination, Message<?> message) {
+        return requestSync(destination, message, producer.getSendMsgTimeout());
+    }
+
+    public org.apache.rocketmq.common.message.Message requestSync(String destination, Object payload) {
+        return requestSync(destination, payload, producer.getSendMsgTimeout());
+    }
+
+    public org.apache.rocketmq.common.message.Message requestSync(String destination, Message<?> message, long timeout) {
+        return requestSync(destination, message, timeout, 0);
+    }
+
+    public org.apache.rocketmq.common.message.Message requestSync(String destination, Object payload, long timeout) {
+        return requestSync(destination, payload, timeout, 0);
+    }
+
+    public org.apache.rocketmq.common.message.Message requestSync(String destination, Message<?> message, long timeout, int delayLevel) {
+        if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
+            log.error("send request message failed. destination:{}, message is null ", destination);
+            throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
+        }
+
+        try {
+            org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage(rocketMQMessageConverter.getMessageConverter(),
+                charset, destination, message);
+            if (delayLevel > 0) {
+                rocketMsg.setDelayTimeLevel(delayLevel);
+            }
+            org.apache.rocketmq.common.message.Message replyMessage;
+            replyMessage = producer.request(rocketMsg, timeout);
+            return replyMessage;
+        } catch (Exception e) {
+            log.error("send request message failed. destination:{}, message:{} ", destination, message);
+            throw new MessagingException(e.getMessage(), e);
+        }
+    }
+
+    public org.apache.rocketmq.common.message.Message requestSync(String destination, Object payload, long timeout, int delayLevel) {
+        Message<?> message = MessageBuilder.withPayload(payload).build();
+        return requestSync(destination, message, timeout, delayLevel);
+    }
+
+    public org.apache.rocketmq.common.message.Message requestSyncOrderly(String destination, Message<?> message, String hashKey) {
+        return requestSyncOrderly(destination, message, hashKey, producer.getSendMsgTimeout());
+    }
+
+    public org.apache.rocketmq.common.message.Message requestSyncOrderly(String destination, Object payload, String hashKey) {
+        return requestSyncOrderly(destination, payload, hashKey, producer.getSendMsgTimeout());
+    }
+
+    public org.apache.rocketmq.common.message.Message requestSyncOrderly(String destination, Message<?> message, String hashKey, long timeout) {
+        return requestSyncOrderly(destination, message, hashKey, timeout, 0);
+    }
+
+    public org.apache.rocketmq.common.message.Message requestSyncOrderly(String destination, Object payload, String hashKey, long timeout) {
+        return requestSyncOrderly(destination, payload, hashKey, timeout, 0);
+    }
+
+    public org.apache.rocketmq.common.message.Message requestSyncOrderly(String destination, Message<?> message, String hashKey, long timeout, int delayLevel) {
+        if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
+            log.error("send request message failed. destination:{}, message is null ", destination);
+            throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
+        }
+
+        try {
+            org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage(rocketMQMessageConverter.getMessageConverter(),
+                charset, destination, message);
+            if (delayLevel > 0) {
+                rocketMsg.setDelayTimeLevel(delayLevel);
+            }
+            org.apache.rocketmq.common.message.Message replyMessage;
 
 Review comment:
   replyMessage variable can be initialized here.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [rocketmq-spring] RongtongJin commented on a change in pull request #209: [ISSUE #208]support request/reply model in rocketmq-spring

Posted by GitBox <gi...@apache.org>.
RongtongJin commented on a change in pull request #209: [ISSUE #208]support request/reply model in rocketmq-spring
URL: https://github.com/apache/rocketmq-spring/pull/209#discussion_r361103525
 
 

 ##########
 File path: rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
 ##########
 @@ -76,6 +80,179 @@ public void setMessageQueueSelector(MessageQueueSelector messageQueueSelector) {
         this.messageQueueSelector = messageQueueSelector;
     }
 
+    public org.apache.rocketmq.common.message.Message requestSync(String destination, Message<?> message) {
+        return requestSync(destination, message, producer.getSendMsgTimeout());
+    }
+
+    public org.apache.rocketmq.common.message.Message requestSync(String destination, Object payload) {
+        return requestSync(destination, payload, producer.getSendMsgTimeout());
+    }
+
+    public org.apache.rocketmq.common.message.Message requestSync(String destination, Message<?> message, long timeout) {
+        return requestSync(destination, message, timeout, 0);
+    }
+
+    public org.apache.rocketmq.common.message.Message requestSync(String destination, Object payload, long timeout) {
+        return requestSync(destination, payload, timeout, 0);
+    }
+
+    public org.apache.rocketmq.common.message.Message requestSync(String destination, Message<?> message, long timeout, int delayLevel) {
+        if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
+            log.error("send request message failed. destination:{}, message is null ", destination);
+            throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
+        }
+
+        try {
+            org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage(rocketMQMessageConverter.getMessageConverter(),
+                charset, destination, message);
+            if (delayLevel > 0) {
+                rocketMsg.setDelayTimeLevel(delayLevel);
+            }
+            org.apache.rocketmq.common.message.Message replyMessage;
+            replyMessage = producer.request(rocketMsg, timeout);
+            return replyMessage;
+        } catch (Exception e) {
+            log.error("send request message failed. destination:{}, message:{} ", destination, message);
+            throw new MessagingException(e.getMessage(), e);
+        }
+    }
+
+    public org.apache.rocketmq.common.message.Message requestSync(String destination, Object payload, long timeout, int delayLevel) {
+        Message<?> message = MessageBuilder.withPayload(payload).build();
+        return requestSync(destination, message, timeout, delayLevel);
+    }
+
+    public org.apache.rocketmq.common.message.Message requestSyncOrderly(String destination, Message<?> message, String hashKey) {
+        return requestSyncOrderly(destination, message, hashKey, producer.getSendMsgTimeout());
+    }
+
+    public org.apache.rocketmq.common.message.Message requestSyncOrderly(String destination, Object payload, String hashKey) {
+        return requestSyncOrderly(destination, payload, hashKey, producer.getSendMsgTimeout());
+    }
+
+    public org.apache.rocketmq.common.message.Message requestSyncOrderly(String destination, Message<?> message, String hashKey, long timeout) {
+        return requestSyncOrderly(destination, message, hashKey, timeout, 0);
+    }
+
+    public org.apache.rocketmq.common.message.Message requestSyncOrderly(String destination, Object payload, String hashKey, long timeout) {
+        return requestSyncOrderly(destination, payload, hashKey, timeout, 0);
+    }
+
+    public org.apache.rocketmq.common.message.Message requestSyncOrderly(String destination, Message<?> message, String hashKey, long timeout, int delayLevel) {
+        if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
+            log.error("send request message failed. destination:{}, message is null ", destination);
+            throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
+        }
+
+        try {
+            org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage(rocketMQMessageConverter.getMessageConverter(),
+                charset, destination, message);
+            if (delayLevel > 0) {
+                rocketMsg.setDelayTimeLevel(delayLevel);
+            }
+            org.apache.rocketmq.common.message.Message replyMessage;
+            if (Objects.isNull(hashKey) || hashKey.isEmpty()) {
+                replyMessage = producer.request(rocketMsg, timeout);
+            } else {
+                replyMessage = producer.request(rocketMsg, messageQueueSelector, hashKey, timeout);
+            }
+            return replyMessage;
+        } catch (Exception e) {
+            log.error("send request message failed. destination:{}, message:{} ", destination, message);
+            throw new MessagingException(e.getMessage(), e);
+        }
+    }
+
+    public org.apache.rocketmq.common.message.Message requestSyncOrderly(String destination, Object payload, String hashKey, long timeout, int delayLevel) {
+        Message<?> message = MessageBuilder.withPayload(payload).build();
+        return requestSyncOrderly(destination, message, hashKey, timeout, delayLevel);
+    }
+
+    public void requestAsync(String destination, Message<?> message, RequestCallback requestCallback) {
+        requestAsync(destination, message, requestCallback, producer.getSendMsgTimeout());
+    }
+
+    public void requestAsync(String destination, Object payload, RequestCallback requestCallback) {
+        Message<?> message = MessageBuilder.withPayload(payload).build();
+        requestAsync(destination, message, requestCallback, producer.getSendMsgTimeout());
+    }
+
+    public void requestAsync(String destination, Message<?> message, RequestCallback requestCallback, long timeout) {
+        requestAsync(destination, message, requestCallback, timeout, 0);
+    }
+
+    public void requestAsync(String destination, Object payload, RequestCallback requestCallback, long timeout) {
+        requestAsync(destination, payload, requestCallback, timeout, 0);
+    }
+
+    public void requestAsync(String destination, Message<?> message, RequestCallback requestCallback, long timeout, int delayLevel) {
+        if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
+            log.error("send request message failed. destination:{}, message is null ", destination);
+            throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
+        }
+
+        try {
+            org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage(rocketMQMessageConverter.getMessageConverter(),
+                charset, destination, message);
+            if (delayLevel > 0) {
+                rocketMsg.setDelayTimeLevel(delayLevel);
+            }
+            producer.request(rocketMsg, requestCallback, timeout);
+        } catch (Exception e) {
+            log.error("send request message failed. destination:{}, message:{} ", destination, message);
+            throw new MessagingException(e.getMessage(), e);
+        }
+    }
+
+    public void requestAsync(String destination, Object payload, RequestCallback requestCallback, long timeout, int delayLevel) {
+        Message<?> message = MessageBuilder.withPayload(payload).build();
+        requestAsync(destination, message, requestCallback, producer.getSendMsgTimeout(), delayLevel);
+    }
+
+    public void requestAsyncOrderly(String destination, Message<?> message, RequestCallback requestCallback, String hashKey) {
+        requestAsyncOrderly(destination, message, requestCallback, hashKey, producer.getSendMsgTimeout());
+    }
+
+    public void requestAsyncOrderly(String destination, Object payload, RequestCallback requestCallback, String hashKey) {
+        requestAsyncOrderly(destination, payload, requestCallback, hashKey, producer.getSendMsgTimeout());
+    }
+
+    public void requestAsyncOrderly(String destination, Message<?> message, RequestCallback requestCallback, String hashKey, long timeout) {
+        requestAsyncOrderly(destination, message, requestCallback, hashKey, timeout, 0);
+    }
+
+    public void requestAsyncOrderly(String destination, Object payload, RequestCallback requestCallback, String hashKey, long timeout) {
+        requestAsyncOrderly(destination, payload, requestCallback, hashKey, timeout, 0);
+    }
+
+    public void requestAsyncOrderly(String destination, Message<?> message, RequestCallback requestCallback, String hashKey, long timeout, int delayLevel) {
+        if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
+            log.error("send request message failed. destination:{}, message is null ", destination);
+            throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
+        }
+
+        try {
+            org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage(rocketMQMessageConverter.getMessageConverter(),
+                charset, destination, message);
+            if (delayLevel > 0) {
+                rocketMsg.setDelayTimeLevel(delayLevel);
+            }
+            if (Objects.isNull(hashKey) || hashKey.isEmpty()) {
+                producer.request(rocketMsg, requestCallback, timeout);
+            } else {
+                producer.request(rocketMsg, messageQueueSelector, hashKey, requestCallback, timeout);
+            }
+        } catch (Exception e) {
+            log.error("send request message failed. destination:{}, message:{} ", destination, message);
+            throw new MessagingException(e.getMessage(), e);
+        }
+    }
+
+    public void requestAsyncOrderly(String destination, Object payload, RequestCallback requestCallback, String hashKey, long timeout, int delayLevel) {
+        Message<?> message = MessageBuilder.withPayload(payload).build();
+        requestAsyncOrderly(destination, message, requestCallback, hashKey, producer.getSendMsgTimeout(), delayLevel);
+    }
+
     /**
      * <p> Send message in synchronous mode. This method returns only when the sending procedure totally completes.
      * Reliable synchronous transmission is used in extensive scenes, such as important notification messages, SMS
 
 Review comment:
   [Discuss] Sending spring message but returning rocketmq message may not be friendly for users, especially the full name of rocketmq message is so long. Do we need to expose rocketmq message to users?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [rocketmq-spring] RongtongJin commented on a change in pull request #209: [ISSUE #208]support request/reply model in rocketmq-spring

Posted by GitBox <gi...@apache.org>.
RongtongJin commented on a change in pull request #209: [ISSUE #208]support request/reply model in rocketmq-spring
URL: https://github.com/apache/rocketmq-spring/pull/209#discussion_r364545200
 
 

 ##########
 File path: rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/java/org/apache/rocketmq/samples/springboot/ProducerApplication.java
 ##########
 @@ -116,6 +128,53 @@ public void onException(Throwable var1) {
 
         // Send transactional messages using extRocketMQTemplate
         testExtRocketMQTemplateTransaction();
+
+        // send request in sync mode and receive a reply of String type.
+        String replyString = rocketMQTemplate.sendAndReceive(stringRequestTopic, "request string", String.class);
+        System.out.printf("send %s and receive %s %n", "request string", replyString);
+
+        // send request in sync mode with timeout parameter and receive a reply of byte[] type.
+        byte[] replyBytes = rocketMQTemplate.sendAndReceive(bytesRequestTopic, new Message<String>() {
+            @Override public String getPayload() {
+                return "request byte[]";
+            }
+
+            @Override public MessageHeaders getHeaders() {
+                return null;
+            }
+        }, byte[].class, 3000);
+        System.out.printf("send %s and receive %s %n", "request byte[]", replyBytes.toString());
+
+        // send request in sync mode with hashKey parameter and receive a reply of User type.
+        User requestUser = new User().setUserAge((byte) 9).setUserName("requestUserName");
+        User replyUser = rocketMQTemplate.sendAndReceive(objectRequestTopic, requestUser, User.class, "order-id");
+        System.out.printf("send %s and receive %s %n", requestUser, replyUser);
+        // send request in sync mode with timeout and delayLevel parameter parameter and receive a reply of generic type.
+        ProductWithPayload<String> replyGenericObject = rocketMQTemplate.sendAndReceive(genericRequestTopic, "request generic",
+            new TypeReference<ProductWithPayload<String>>() {
+            }.getType(), 30000, 2);
+        System.out.printf("send %s and receive %s %n", "request generic", replyGenericObject);
+
+        // send request in async mode and receive a reply of String type.
+        rocketMQTemplate.sendAndReceive(stringRequestTopic, "request string", new RocketMQLocalRequestCallback<String>() {
+            @Override public void onSuccess(String message) {
+                System.out.println("receive string: " + message);
+            }
+
+            @Override public void onException(Throwable e) {
+                e.printStackTrace();
+            }
+        });
+        // send request in async mode and receive a reply of User type.
+        rocketMQTemplate.sendAndReceive(objectRequestTopic, new User().setUserAge((byte) 9).setUserName("requestUserName"), new RocketMQLocalRequestCallback<User>() {
+            @Override public void onSuccess(User message) {
+                System.out.println("receive User: " + message.toString());
+            }
+
+            @Override public void onException(Throwable e) {
+                e.printStackTrace();
+            }
+        }, 5000);
 
 Review comment:
   It would be better to use `System.out.printf` instead of `System.out.println`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [rocketmq-spring] xiangwangcheng edited a comment on issue #209: [ISSUE #208]support request/reply model in rocketmq-spring

Posted by GitBox <gi...@apache.org>.
xiangwangcheng edited a comment on issue #209: [ISSUE #208]support request/reply model in rocketmq-spring
URL: https://github.com/apache/rocketmq-spring/pull/209#issuecomment-570932230
 
 
   I have made some changes after referring to the implementation of Kafka-spring.
   1. reduce the number of methods
   2. use generic type to receive reply content
   3. add a new interface `RocketMQReplyListener` for consuming and replying.
   4. conceal the reply logic in consumer side.
   
   @vongosling @RongtongJin @zongtanghu @duhenglucky 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [rocketmq-spring] RongtongJin commented on a change in pull request #209: [ISSUE #208]support request/reply model in rocketmq-spring

Posted by GitBox <gi...@apache.org>.
RongtongJin commented on a change in pull request #209: [ISSUE #208]support request/reply model in rocketmq-spring
URL: https://github.com/apache/rocketmq-spring/pull/209#discussion_r363579325
 
 

 ##########
 File path: rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQReplyListener.java
 ##########
 @@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.spring.core;
+
+/**
+ * The consumer supported request-response model should implement this interface.
+ *
 
 Review comment:
   `The consumer supporting request-reply should implement this interface.`  is better

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [rocketmq-spring] xiangwangcheng commented on a change in pull request #209: [ISSUE #208]support request/reply model in rocketmq-spring

Posted by GitBox <gi...@apache.org>.
xiangwangcheng commented on a change in pull request #209: [ISSUE #208]support request/reply model in rocketmq-spring
URL: https://github.com/apache/rocketmq-spring/pull/209#discussion_r361101713
 
 

 ##########
 File path: rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
 ##########
 @@ -76,6 +80,179 @@ public void setMessageQueueSelector(MessageQueueSelector messageQueueSelector) {
         this.messageQueueSelector = messageQueueSelector;
     }
 
+    public org.apache.rocketmq.common.message.Message requestSync(String destination, Message<?> message) {
+        return requestSync(destination, message, producer.getSendMsgTimeout());
+    }
+
+    public org.apache.rocketmq.common.message.Message requestSync(String destination, Object payload) {
+        return requestSync(destination, payload, producer.getSendMsgTimeout());
+    }
+
+    public org.apache.rocketmq.common.message.Message requestSync(String destination, Message<?> message, long timeout) {
+        return requestSync(destination, message, timeout, 0);
+    }
+
+    public org.apache.rocketmq.common.message.Message requestSync(String destination, Object payload, long timeout) {
+        return requestSync(destination, payload, timeout, 0);
+    }
+
+    public org.apache.rocketmq.common.message.Message requestSync(String destination, Message<?> message, long timeout, int delayLevel) {
+        if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
+            log.error("send request message failed. destination:{}, message is null ", destination);
+            throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
+        }
+
+        try {
+            org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage(rocketMQMessageConverter.getMessageConverter(),
+                charset, destination, message);
+            if (delayLevel > 0) {
+                rocketMsg.setDelayTimeLevel(delayLevel);
+            }
+            org.apache.rocketmq.common.message.Message replyMessage;
+            replyMessage = producer.request(rocketMsg, timeout);
+            return replyMessage;
+        } catch (Exception e) {
+            log.error("send request message failed. destination:{}, message:{} ", destination, message);
+            throw new MessagingException(e.getMessage(), e);
+        }
+    }
+
+    public org.apache.rocketmq.common.message.Message requestSync(String destination, Object payload, long timeout, int delayLevel) {
+        Message<?> message = MessageBuilder.withPayload(payload).build();
+        return requestSync(destination, message, timeout, delayLevel);
+    }
+
+    public org.apache.rocketmq.common.message.Message requestSyncOrderly(String destination, Message<?> message, String hashKey) {
+        return requestSyncOrderly(destination, message, hashKey, producer.getSendMsgTimeout());
+    }
+
+    public org.apache.rocketmq.common.message.Message requestSyncOrderly(String destination, Object payload, String hashKey) {
+        return requestSyncOrderly(destination, payload, hashKey, producer.getSendMsgTimeout());
+    }
+
+    public org.apache.rocketmq.common.message.Message requestSyncOrderly(String destination, Message<?> message, String hashKey, long timeout) {
+        return requestSyncOrderly(destination, message, hashKey, timeout, 0);
+    }
+
+    public org.apache.rocketmq.common.message.Message requestSyncOrderly(String destination, Object payload, String hashKey, long timeout) {
+        return requestSyncOrderly(destination, payload, hashKey, timeout, 0);
+    }
+
+    public org.apache.rocketmq.common.message.Message requestSyncOrderly(String destination, Message<?> message, String hashKey, long timeout, int delayLevel) {
+        if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
+            log.error("send request message failed. destination:{}, message is null ", destination);
+            throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
+        }
+
+        try {
+            org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage(rocketMQMessageConverter.getMessageConverter(),
+                charset, destination, message);
+            if (delayLevel > 0) {
+                rocketMsg.setDelayTimeLevel(delayLevel);
+            }
+            org.apache.rocketmq.common.message.Message replyMessage;
 
 Review comment:
   Actually, "variable initializer null here is reduncdant" warnings will show if we initialize null to it.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [rocketmq-spring] xiangwangcheng commented on a change in pull request #209: [ISSUE #208]support request/reply model in rocketmq-spring

Posted by GitBox <gi...@apache.org>.
xiangwangcheng commented on a change in pull request #209: [ISSUE #208]support request/reply model in rocketmq-spring
URL: https://github.com/apache/rocketmq-spring/pull/209#discussion_r361122344
 
 

 ##########
 File path: rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
 ##########
 @@ -76,6 +80,179 @@ public void setMessageQueueSelector(MessageQueueSelector messageQueueSelector) {
         this.messageQueueSelector = messageQueueSelector;
     }
 
+    public org.apache.rocketmq.common.message.Message requestSync(String destination, Message<?> message) {
+        return requestSync(destination, message, producer.getSendMsgTimeout());
+    }
+
+    public org.apache.rocketmq.common.message.Message requestSync(String destination, Object payload) {
+        return requestSync(destination, payload, producer.getSendMsgTimeout());
+    }
+
+    public org.apache.rocketmq.common.message.Message requestSync(String destination, Message<?> message, long timeout) {
+        return requestSync(destination, message, timeout, 0);
+    }
+
+    public org.apache.rocketmq.common.message.Message requestSync(String destination, Object payload, long timeout) {
+        return requestSync(destination, payload, timeout, 0);
+    }
+
+    public org.apache.rocketmq.common.message.Message requestSync(String destination, Message<?> message, long timeout, int delayLevel) {
+        if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
+            log.error("send request message failed. destination:{}, message is null ", destination);
+            throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
+        }
+
+        try {
+            org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage(rocketMQMessageConverter.getMessageConverter(),
+                charset, destination, message);
+            if (delayLevel > 0) {
+                rocketMsg.setDelayTimeLevel(delayLevel);
+            }
+            org.apache.rocketmq.common.message.Message replyMessage;
+            replyMessage = producer.request(rocketMsg, timeout);
+            return replyMessage;
+        } catch (Exception e) {
+            log.error("send request message failed. destination:{}, message:{} ", destination, message);
+            throw new MessagingException(e.getMessage(), e);
+        }
+    }
+
+    public org.apache.rocketmq.common.message.Message requestSync(String destination, Object payload, long timeout, int delayLevel) {
+        Message<?> message = MessageBuilder.withPayload(payload).build();
+        return requestSync(destination, message, timeout, delayLevel);
+    }
+
+    public org.apache.rocketmq.common.message.Message requestSyncOrderly(String destination, Message<?> message, String hashKey) {
+        return requestSyncOrderly(destination, message, hashKey, producer.getSendMsgTimeout());
+    }
+
+    public org.apache.rocketmq.common.message.Message requestSyncOrderly(String destination, Object payload, String hashKey) {
+        return requestSyncOrderly(destination, payload, hashKey, producer.getSendMsgTimeout());
+    }
+
+    public org.apache.rocketmq.common.message.Message requestSyncOrderly(String destination, Message<?> message, String hashKey, long timeout) {
+        return requestSyncOrderly(destination, message, hashKey, timeout, 0);
+    }
+
+    public org.apache.rocketmq.common.message.Message requestSyncOrderly(String destination, Object payload, String hashKey, long timeout) {
+        return requestSyncOrderly(destination, payload, hashKey, timeout, 0);
+    }
+
+    public org.apache.rocketmq.common.message.Message requestSyncOrderly(String destination, Message<?> message, String hashKey, long timeout, int delayLevel) {
+        if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
+            log.error("send request message failed. destination:{}, message is null ", destination);
+            throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
+        }
+
+        try {
+            org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage(rocketMQMessageConverter.getMessageConverter(),
+                charset, destination, message);
+            if (delayLevel > 0) {
+                rocketMsg.setDelayTimeLevel(delayLevel);
+            }
+            org.apache.rocketmq.common.message.Message replyMessage;
+            if (Objects.isNull(hashKey) || hashKey.isEmpty()) {
+                replyMessage = producer.request(rocketMsg, timeout);
+            } else {
+                replyMessage = producer.request(rocketMsg, messageQueueSelector, hashKey, timeout);
+            }
+            return replyMessage;
+        } catch (Exception e) {
+            log.error("send request message failed. destination:{}, message:{} ", destination, message);
+            throw new MessagingException(e.getMessage(), e);
+        }
+    }
+
+    public org.apache.rocketmq.common.message.Message requestSyncOrderly(String destination, Object payload, String hashKey, long timeout, int delayLevel) {
+        Message<?> message = MessageBuilder.withPayload(payload).build();
+        return requestSyncOrderly(destination, message, hashKey, timeout, delayLevel);
+    }
+
+    public void requestAsync(String destination, Message<?> message, RequestCallback requestCallback) {
+        requestAsync(destination, message, requestCallback, producer.getSendMsgTimeout());
+    }
+
+    public void requestAsync(String destination, Object payload, RequestCallback requestCallback) {
+        Message<?> message = MessageBuilder.withPayload(payload).build();
+        requestAsync(destination, message, requestCallback, producer.getSendMsgTimeout());
+    }
+
+    public void requestAsync(String destination, Message<?> message, RequestCallback requestCallback, long timeout) {
+        requestAsync(destination, message, requestCallback, timeout, 0);
+    }
+
+    public void requestAsync(String destination, Object payload, RequestCallback requestCallback, long timeout) {
+        requestAsync(destination, payload, requestCallback, timeout, 0);
+    }
+
+    public void requestAsync(String destination, Message<?> message, RequestCallback requestCallback, long timeout, int delayLevel) {
+        if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
+            log.error("send request message failed. destination:{}, message is null ", destination);
+            throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
+        }
+
+        try {
+            org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage(rocketMQMessageConverter.getMessageConverter(),
+                charset, destination, message);
+            if (delayLevel > 0) {
+                rocketMsg.setDelayTimeLevel(delayLevel);
+            }
+            producer.request(rocketMsg, requestCallback, timeout);
+        } catch (Exception e) {
+            log.error("send request message failed. destination:{}, message:{} ", destination, message);
+            throw new MessagingException(e.getMessage(), e);
+        }
+    }
+
+    public void requestAsync(String destination, Object payload, RequestCallback requestCallback, long timeout, int delayLevel) {
+        Message<?> message = MessageBuilder.withPayload(payload).build();
+        requestAsync(destination, message, requestCallback, producer.getSendMsgTimeout(), delayLevel);
+    }
+
+    public void requestAsyncOrderly(String destination, Message<?> message, RequestCallback requestCallback, String hashKey) {
+        requestAsyncOrderly(destination, message, requestCallback, hashKey, producer.getSendMsgTimeout());
+    }
+
+    public void requestAsyncOrderly(String destination, Object payload, RequestCallback requestCallback, String hashKey) {
+        requestAsyncOrderly(destination, payload, requestCallback, hashKey, producer.getSendMsgTimeout());
+    }
+
+    public void requestAsyncOrderly(String destination, Message<?> message, RequestCallback requestCallback, String hashKey, long timeout) {
+        requestAsyncOrderly(destination, message, requestCallback, hashKey, timeout, 0);
+    }
+
+    public void requestAsyncOrderly(String destination, Object payload, RequestCallback requestCallback, String hashKey, long timeout) {
+        requestAsyncOrderly(destination, payload, requestCallback, hashKey, timeout, 0);
+    }
+
+    public void requestAsyncOrderly(String destination, Message<?> message, RequestCallback requestCallback, String hashKey, long timeout, int delayLevel) {
+        if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
+            log.error("send request message failed. destination:{}, message is null ", destination);
+            throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
+        }
+
+        try {
+            org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage(rocketMQMessageConverter.getMessageConverter(),
+                charset, destination, message);
+            if (delayLevel > 0) {
+                rocketMsg.setDelayTimeLevel(delayLevel);
+            }
+            if (Objects.isNull(hashKey) || hashKey.isEmpty()) {
+                producer.request(rocketMsg, requestCallback, timeout);
+            } else {
+                producer.request(rocketMsg, messageQueueSelector, hashKey, requestCallback, timeout);
+            }
+        } catch (Exception e) {
+            log.error("send request message failed. destination:{}, message:{} ", destination, message);
+            throw new MessagingException(e.getMessage(), e);
+        }
+    }
+
+    public void requestAsyncOrderly(String destination, Object payload, RequestCallback requestCallback, String hashKey, long timeout, int delayLevel) {
+        Message<?> message = MessageBuilder.withPayload(payload).build();
+        requestAsyncOrderly(destination, message, requestCallback, hashKey, producer.getSendMsgTimeout(), delayLevel);
+    }
+
     /**
      * <p> Send message in synchronous mode. This method returns only when the sending procedure totally completes.
      * Reliable synchronous transmission is used in extensive scenes, such as important notification messages, SMS
 
 Review comment:
   > [Discuss] Sending spring message but returning rocketmq message may not be friendly for users, especially the full name of rocketmq message is so long. Do we need to expose rocketmq message to users?
   
   Yeah, I am not sure about that too, but I think users MAY need the attributes in RocketMQ Message, such as topic/properties.
   
   > IMO, there are too many method names. Keep the same name, such as 'sendAndReceive ' and use different parameters to distinguish different types of methods.
   
   I have thought about that in the first place. But I changed my mind because the current implementation will make the code simple and more friendly to users.
   (Seems `sendXXOrdery` method can be merged sendXXX, but can not merge `requestSync` and `requestAsync` because of different return types the sync mode and async mode. )

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [rocketmq-spring] RongtongJin commented on a change in pull request #209: [ISSUE #208]support request/reply model in rocketmq-spring

Posted by GitBox <gi...@apache.org>.
RongtongJin commented on a change in pull request #209: [ISSUE #208]support request/reply model in rocketmq-spring
URL: https://github.com/apache/rocketmq-spring/pull/209#discussion_r364544802
 
 

 ##########
 File path: rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/java/org/apache/rocketmq/samples/springboot/ProducerApplication.java
 ##########
 @@ -116,6 +128,53 @@ public void onException(Throwable var1) {
 
         // Send transactional messages using extRocketMQTemplate
         testExtRocketMQTemplateTransaction();
+
+        // send request in sync mode and receive a reply of String type.
 
 Review comment:
   It would be better to capitalize first letter of comments

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [rocketmq-spring] RongtongJin commented on a change in pull request #209: [ISSUE #208]support request/reply model in rocketmq-spring

Posted by GitBox <gi...@apache.org>.
RongtongJin commented on a change in pull request #209: [ISSUE #208]support request/reply model in rocketmq-spring
URL: https://github.com/apache/rocketmq-spring/pull/209#discussion_r363581165
 
 

 ##########
 File path: rocketmq-spring-boot-samples/rocketmq-produce-demo/src/main/java/org/apache/rocketmq/samples/springboot/ProducerApplication.java
 ##########
 @@ -116,6 +128,44 @@ public void onException(Throwable var1) {
 
         // Send transactional messages using extRocketMQTemplate
         testExtRocketMQTemplateTransaction();
+
+        // send request in sync mode and receive a reply of String type.
+        String replyString = rocketMQTemplate.sendAndReceive(stringRequestTopic, "request string", String.class);
+        System.out.printf("send %s and receive %s %n", "request string", replyString);
+
+        // send request in sync mode with timeout parameter and receive a reply of byte[] type.
+        byte[] replyBytes = rocketMQTemplate.sendAndReceive(bytesRequestTopic, new Message<String>() {
+            @Override public String getPayload() {
+                return "request byte[]";
+            }
+
+            @Override public MessageHeaders getHeaders() {
+                return null;
+            }
+        }, byte[].class, 3000);
+        System.out.printf("send %s and receive %s %n", "request byte[]", replyBytes.toString());
+
+        // send request in sync mode with hashKey parameter and receive a reply of User type.
+        User requestUser = new User().setUserAge(Byte.valueOf((byte) 9)).setUserName("requestUserName");
+        User replyUser = rocketMQTemplate.sendAndReceive(objectRequestTopic, requestUser, User.class, "order-id");
+        System.out.printf("send %s and receive %s %n", requestUser, replyUser);
+        // send request in sync mode with timeout and delayLevel parameter parameter and receive a reply of generic type.
+        ProductWithPayload<String> replyGenericObject = rocketMQTemplate.sendAndReceive(genericRequestTopic, "request generic",
+            new TypeReference<ProductWithPayload<String>>() {
+            }.getType(), 30000, 2);
+        System.out.printf("send %s and receive %s %n", "request generic", replyGenericObject);
+
+        // send request in async mode and receive a reply of String type.
+        rocketMQTemplate.sendAndReceive(stringRequestTopic, "request string", new RequestCallback() {
+            @Override public void onSuccess(org.apache.rocketmq.common.message.Message message) {
+                System.out.print("receive reply content in callback: " + message.toString());
+            }
+
+            @Override public void onException(Throwable e) {
+                e.printStackTrace();
+            }
+        });
 
 Review comment:
   [Important] we need to swicth RequestCallback, because rocketmq message expose to users.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services