You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2023/01/29 06:41:33 UTC

[rocketmq-spring] branch master updated: [ISSUE #506] support send message with arbitrarily delay time (#515)

This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-spring.git


The following commit(s) were added to refs/heads/master by this push:
     new 243820c  [ISSUE #506] support send message with arbitrarily delay time (#515)
243820c is described below

commit 243820c18e2532a9e2d68accfa0a13c6802850a6
Author: zhouchunhai <11...@users.noreply.github.com>
AuthorDate: Sun Jan 29 14:41:27 2023 +0800

    [ISSUE #506] support send message with arbitrarily delay time (#515)
    
    * support sync send message with arbitrarily delay time.
    
    * support sync send message with arbitrarily delay time.
    
    * fix check style error.
---
 .../rocketmq/spring/core/RocketMQTemplate.java     | 72 ++++++++++++++++++++++
 .../apache/rocketmq/spring/support/DelayMode.java  | 24 ++++++++
 .../rocketmq/spring/core/RocketMQTemplateTest.java |  6 ++
 3 files changed, 102 insertions(+)

diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
index f1f7886..8b444e3 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
@@ -31,6 +31,7 @@ import org.apache.rocketmq.client.producer.selector.SelectMessageQueueByHash;
 import org.apache.rocketmq.common.message.MessageBatch;
 import org.apache.rocketmq.common.message.MessageClientIDSetter;
 import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.spring.support.DelayMode;
 import org.apache.rocketmq.spring.support.RocketMQMessageConverter;
 import org.apache.rocketmq.spring.support.RocketMQUtil;
 import org.slf4j.Logger;
@@ -535,6 +536,77 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
         }
     }
 
+    /**
+     * Same to {@link #syncSend(String, Message)} with send delay time specified in addition.
+     *
+     * @param destination formats: `topicName:tags`
+     * @param message {@link org.springframework.messaging.Message}
+     * @param delayTime delay time in seconds for message
+     * @return {@link SendResult}
+     */
+    public SendResult syncSendDelayTimeSeconds(String destination, Message<?> message, long delayTime) {
+        return syncSend(destination, message, producer.getSendMsgTimeout(), delayTime, DelayMode.DELAY_SECONDS);
+    }
+
+    /**
+     * Same to {@link #syncSend(String, Object)} with send delayTime specified in addition.
+     *
+     * @param destination formats: `topicName:tags`
+     * @param payload the Object to use as payload
+     * @param delayTime delay time in seconds for message
+     * @return {@link SendResult}
+     */
+    public SendResult syncSendDelayTimeSeconds(String destination, Object payload, long delayTime) {
+        Message<?> message = MessageBuilder.withPayload(payload).build();
+        return syncSend(destination, message, producer.getSendMsgTimeout(), delayTime, DelayMode.DELAY_SECONDS);
+    }
+
+    /**
+     * Same to {@link #syncSend(String, Message)} with send timeout and delay time specified in addition.
+     *
+     * @param destination formats: `topicName:tags`
+     * @param message {@link org.springframework.messaging.Message}
+     * @param timeout send timeout with millis
+     * @param delayTime delay time for message
+     * @return {@link SendResult}
+     */
+    public SendResult syncSend(String destination, Message<?> message, long timeout, long delayTime, DelayMode mode) {
+        if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
+            log.error("syncSend failed. destination:{}, message is null ", destination);
+            throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
+        }
+        try {
+            long now = System.currentTimeMillis();
+            org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);
+            if (delayTime > 0 && Objects.nonNull(mode)) {
+                switch (mode) {
+                    case DELAY_SECONDS:
+                        rocketMsg.setDelayTimeSec(delayTime);
+                        break;
+                    case DELAY_MILLISECONDS:
+                        rocketMsg.setDelayTimeMs(delayTime);
+                        break;
+                    case DELIVER_TIME_MILLISECONDS:
+                        rocketMsg.setDeliverTimeMs(delayTime);
+                        break;
+                    default:
+                        log.warn("delay mode: {} not support", mode);
+                }
+            }
+            SendResult sendResult = producer.send(rocketMsg, timeout);
+            long costTime = System.currentTimeMillis() - now;
+            if (log.isDebugEnabled()) {
+                log.debug("send message cost: {} ms, msgId:{}", costTime, sendResult.getMsgId());
+            }
+            return sendResult;
+        } catch (Exception e) {
+            log.error("syncSend failed. destination:{}, message:{}, detail exception info: ", destination, message, e);
+            throw new MessagingException(e.getMessage(), e);
+        }
+    }
+
+
+
     /**
      * Same to {@link #syncSend(String, Message)} with send timeout specified in addition.
      *
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DelayMode.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DelayMode.java
new file mode 100644
index 0000000..ff70c89
--- /dev/null
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DelayMode.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.spring.support;
+
+public enum DelayMode {
+    DELAY_SECONDS,
+    DELAY_MILLISECONDS,
+    DELIVER_TIME_MILLISECONDS,
+}
diff --git a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/core/RocketMQTemplateTest.java b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/core/RocketMQTemplateTest.java
index 49ce039..256345a 100644
--- a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/core/RocketMQTemplateTest.java
+++ b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/core/RocketMQTemplateTest.java
@@ -94,6 +94,12 @@ public class RocketMQTemplateTest {
         } catch (MessagingException e) {
             assertThat(e).hasMessageContaining("org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to null failed");
         }
+
+        try {
+            rocketMQTemplate.syncSendDelayTimeSeconds(topic, "payload", 10L);
+        } catch (MessagingException e) {
+            assertThat(e).hasMessageContaining("org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to null failed");
+        }
     }
     @Test
     public void testAsyncBatchSendMessage() {