You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2020/02/21 07:35:29 UTC
[rocketmq-spring.wiki] branch master updated: Created Request-reply消息发送 (markdown)
This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-spring.wiki.git
The following commit(s) were added to refs/heads/master by this push:
new bcfd343 Created Request-reply消息发送 (markdown)
bcfd343 is described below
commit bcfd343c3385d8223c5486db84320e9c173e72f1
Author: rongtong <ji...@163.com>
AuthorDate: Fri Feb 21 15:35:20 2020 +0800
Created Request-reply消息发送 (markdown)
---
...266\210\346\201\257\345\217\221\351\200\201.md" | 88 ++++++++++++++++++++++
1 file changed, 88 insertions(+)
diff --git "a/Request-reply\346\266\210\346\201\257\345\217\221\351\200\201.md" "b/Request-reply\346\266\210\346\201\257\345\217\221\351\200\201.md"
new file mode 100644
index 0000000..e185bb7
--- /dev/null
+++ "b/Request-reply\346\266\210\346\201\257\345\217\221\351\200\201.md"
@@ -0,0 +1,88 @@
+RocketMQ-Spring 提供 请求/应答 语义支持。
+
+- Producer端
+
+发送Request消息使用SendAndReceive方法
+
+> 注意
+>
+> 同步发送需要在方法的参数中指明返回值类型
+>
+> 异步发送需要在回调的接口中指明返回值类型
+
+```java
+@SpringBootApplication
+public class ProducerApplication implements CommandLineRunner{
+ @Resource
+ private RocketMQTemplate rocketMQTemplate;
+
+ public static void main(String[] args){
+ SpringApplication.run(ProducerApplication.class, args);
+ }
+
+ public void run(String... args) throws Exception {
+ // 同步发送request并且等待String类型的返回值
+ String replyString = rocketMQTemplate.sendAndReceive("stringRequestTopic", "request string", String.class);
+
+ // 异步发送request并且等待User类型的返回值
+ rocketMQTemplate.sendAndReceive("objectRequestTopic", new User("requestUserName",(byte) 9), new RocketMQLocalRequestCallback<User>() {
+ @Override public void onSuccess(User message) {
+ System.out.printf("send user object and receive %s %n", message.toString());
+ }
+
+ @Override public void onException(Throwable e) {
+ e.printStackTrace();
+ }
+ }, 5000);
+ }
+
+ @Data
+ @AllArgsConstructor
+ public class User implements Serializable{
+ private String userName;
+ private Byte userAge;
+ }
+}
+```
+
+- Consumer端
+
+需要实现RocketMQReplyListener<T, R> 接口,其中T表示接收值的类型,R表示返回值的类型。
+
+```java
+@SpringBootApplication
+public class ConsumerApplication{
+
+ public static void main(String[] args){
+ SpringApplication.run(ConsumerApplication.class, args);
+ }
+
+ @Service
+ @RocketMQMessageListener(topic = "stringRequestTopic", consumerGroup = "stringRequestConsumer")
+ public class StringConsumerWithReplyString implements RocketMQReplyListener<String, String> {
+ @Override
+ public String onMessage(String message) {
+ System.out.printf("------- StringConsumerWithReplyString received: %s \n", message);
+ return "reply string";
+ }
+ }
+
+ @Service
+ @RocketMQMessageListener(topic = "objectRequestTopic", consumerGroup = "objectRequestConsumer")
+ public class ObjectConsumerWithReplyUser implements RocketMQReplyListener<User, User>{
+ public void onMessage(User user) {
+ System.out.printf("------- ObjectConsumerWithReplyUser received: %s \n", user);
+ User replyUser = new User("replyUserName",(byte) 10);
+ return replyUser;
+ }
+ }
+
+ @Data
+ @AllArgsConstructor
+ public class User implements Serializable{
+ private String userName;
+ private Byte userAge;
+ }
+}
+```
+