You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2020/02/21 07:35:29 UTC

[rocketmq-spring.wiki] branch master updated: Created Request-reply消息发送 (markdown)

This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-spring.wiki.git


The following commit(s) were added to refs/heads/master by this push:
     new bcfd343  Created Request-reply消息发送 (markdown)
bcfd343 is described below

commit bcfd343c3385d8223c5486db84320e9c173e72f1
Author: rongtong <ji...@163.com>
AuthorDate: Fri Feb 21 15:35:20 2020 +0800

    Created Request-reply消息发送 (markdown)
---
 ...266\210\346\201\257\345\217\221\351\200\201.md" | 88 ++++++++++++++++++++++
 1 file changed, 88 insertions(+)

diff --git "a/Request-reply\346\266\210\346\201\257\345\217\221\351\200\201.md" "b/Request-reply\346\266\210\346\201\257\345\217\221\351\200\201.md"
new file mode 100644
index 0000000..e185bb7
--- /dev/null
+++ "b/Request-reply\346\266\210\346\201\257\345\217\221\351\200\201.md"
@@ -0,0 +1,88 @@
+RocketMQ-Spring 提供 请求/应答 语义支持。
+
+- Producer端
+
+发送Request消息使用SendAndReceive方法
+
+> 注意
+>
+> 同步发送需要在方法的参数中指明返回值类型
+>
+> 异步发送需要在回调的接口中指明返回值类型
+
+```java
+@SpringBootApplication
+public class ProducerApplication implements CommandLineRunner{
+    @Resource
+    private RocketMQTemplate rocketMQTemplate;
+    
+    public static void main(String[] args){
+        SpringApplication.run(ProducerApplication.class, args);
+    }
+    
+    public void run(String... args) throws Exception {
+        // 同步发送request并且等待String类型的返回值
+        String replyString = rocketMQTemplate.sendAndReceive("stringRequestTopic", "request string", String.class);
+        
+        // 异步发送request并且等待User类型的返回值
+        rocketMQTemplate.sendAndReceive("objectRequestTopic", new User("requestUserName",(byte) 9), new RocketMQLocalRequestCallback<User>() {
+            @Override public void onSuccess(User message) {
+                System.out.printf("send user object and receive %s %n", message.toString());
+            }
+
+            @Override public void onException(Throwable e) {
+                e.printStackTrace();
+            }
+        }, 5000);
+    }
+    
+    @Data
+    @AllArgsConstructor
+    public class User implements Serializable{
+        private String userName;
+    		private Byte userAge;
+    }
+}
+```
+
+- Consumer端
+
+需要实现RocketMQReplyListener<T, R> 接口,其中T表示接收值的类型,R表示返回值的类型。
+
+```java
+@SpringBootApplication
+public class ConsumerApplication{
+    
+    public static void main(String[] args){
+        SpringApplication.run(ConsumerApplication.class, args);
+    }
+    
+    @Service
+    @RocketMQMessageListener(topic = "stringRequestTopic", consumerGroup = "stringRequestConsumer")
+    public class StringConsumerWithReplyString implements RocketMQReplyListener<String, String> {
+        @Override
+        public String onMessage(String message) {
+          System.out.printf("------- StringConsumerWithReplyString received: %s \n", message);
+          return "reply string";
+        }
+      }
+   
+    @Service
+    @RocketMQMessageListener(topic = "objectRequestTopic", consumerGroup = "objectRequestConsumer")
+    public class ObjectConsumerWithReplyUser implements RocketMQReplyListener<User, User>{
+        public void onMessage(User user) {
+          	System.out.printf("------- ObjectConsumerWithReplyUser received: %s \n", user);
+          	User replyUser = new User("replyUserName",(byte) 10);	
+          	return replyUser;
+        }
+    }
+
+    @Data
+    @AllArgsConstructor
+    public class User implements Serializable{
+        private String userName;
+    		private Byte userAge;
+    }
+}
+```
+