You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2023/01/29 06:41:33 UTC
[rocketmq-spring] branch master updated: [ISSUE #506] support send message with arbitrarily delay time (#515)
This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-spring.git
The following commit(s) were added to refs/heads/master by this push:
new 243820c [ISSUE #506] support send message with arbitrarily delay time (#515)
243820c is described below
commit 243820c18e2532a9e2d68accfa0a13c6802850a6
Author: zhouchunhai <11...@users.noreply.github.com>
AuthorDate: Sun Jan 29 14:41:27 2023 +0800
[ISSUE #506] support send message with arbitrarily delay time (#515)
* support sync send message with arbitrarily delay time.
* support sync send message with arbitrarily delay time.
* fix check style error.
---
.../rocketmq/spring/core/RocketMQTemplate.java | 72 ++++++++++++++++++++++
.../apache/rocketmq/spring/support/DelayMode.java | 24 ++++++++
.../rocketmq/spring/core/RocketMQTemplateTest.java | 6 ++
3 files changed, 102 insertions(+)
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
index f1f7886..8b444e3 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
@@ -31,6 +31,7 @@ import org.apache.rocketmq.client.producer.selector.SelectMessageQueueByHash;
import org.apache.rocketmq.common.message.MessageBatch;
import org.apache.rocketmq.common.message.MessageClientIDSetter;
import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.spring.support.DelayMode;
import org.apache.rocketmq.spring.support.RocketMQMessageConverter;
import org.apache.rocketmq.spring.support.RocketMQUtil;
import org.slf4j.Logger;
@@ -535,6 +536,77 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> imp
}
}
+ /**
+ * Same to {@link #syncSend(String, Message)} with send delay time specified in addition.
+ *
+ * @param destination formats: `topicName:tags`
+ * @param message {@link org.springframework.messaging.Message}
+ * @param delayTime delay time in seconds for message
+ * @return {@link SendResult}
+ */
+ public SendResult syncSendDelayTimeSeconds(String destination, Message<?> message, long delayTime) {
+ return syncSend(destination, message, producer.getSendMsgTimeout(), delayTime, DelayMode.DELAY_SECONDS);
+ }
+
+ /**
+ * Same to {@link #syncSend(String, Object)} with send delayTime specified in addition.
+ *
+ * @param destination formats: `topicName:tags`
+ * @param payload the Object to use as payload
+ * @param delayTime delay time in seconds for message
+ * @return {@link SendResult}
+ */
+ public SendResult syncSendDelayTimeSeconds(String destination, Object payload, long delayTime) {
+ Message<?> message = MessageBuilder.withPayload(payload).build();
+ return syncSend(destination, message, producer.getSendMsgTimeout(), delayTime, DelayMode.DELAY_SECONDS);
+ }
+
+ /**
+ * Same to {@link #syncSend(String, Message)} with send timeout and delay time specified in addition.
+ *
+ * @param destination formats: `topicName:tags`
+ * @param message {@link org.springframework.messaging.Message}
+ * @param timeout send timeout with millis
+ * @param delayTime delay time for message
+ * @return {@link SendResult}
+ */
+ public SendResult syncSend(String destination, Message<?> message, long timeout, long delayTime, DelayMode mode) {
+ if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {
+ log.error("syncSend failed. destination:{}, message is null ", destination);
+ throw new IllegalArgumentException("`message` and `message.payload` cannot be null");
+ }
+ try {
+ long now = System.currentTimeMillis();
+ org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);
+ if (delayTime > 0 && Objects.nonNull(mode)) {
+ switch (mode) {
+ case DELAY_SECONDS:
+ rocketMsg.setDelayTimeSec(delayTime);
+ break;
+ case DELAY_MILLISECONDS:
+ rocketMsg.setDelayTimeMs(delayTime);
+ break;
+ case DELIVER_TIME_MILLISECONDS:
+ rocketMsg.setDeliverTimeMs(delayTime);
+ break;
+ default:
+ log.warn("delay mode: {} not support", mode);
+ }
+ }
+ SendResult sendResult = producer.send(rocketMsg, timeout);
+ long costTime = System.currentTimeMillis() - now;
+ if (log.isDebugEnabled()) {
+ log.debug("send message cost: {} ms, msgId:{}", costTime, sendResult.getMsgId());
+ }
+ return sendResult;
+ } catch (Exception e) {
+ log.error("syncSend failed. destination:{}, message:{}, detail exception info: ", destination, message, e);
+ throw new MessagingException(e.getMessage(), e);
+ }
+ }
+
+
+
/**
* Same to {@link #syncSend(String, Message)} with send timeout specified in addition.
*
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DelayMode.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DelayMode.java
new file mode 100644
index 0000000..ff70c89
--- /dev/null
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DelayMode.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.spring.support;
+
+public enum DelayMode {
+ DELAY_SECONDS,
+ DELAY_MILLISECONDS,
+ DELIVER_TIME_MILLISECONDS,
+}
diff --git a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/core/RocketMQTemplateTest.java b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/core/RocketMQTemplateTest.java
index 49ce039..256345a 100644
--- a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/core/RocketMQTemplateTest.java
+++ b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/core/RocketMQTemplateTest.java
@@ -94,6 +94,12 @@ public class RocketMQTemplateTest {
} catch (MessagingException e) {
assertThat(e).hasMessageContaining("org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to null failed");
}
+
+ try {
+ rocketMQTemplate.syncSendDelayTimeSeconds(topic, "payload", 10L);
+ } catch (MessagingException e) {
+ assertThat(e).hasMessageContaining("org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to null failed");
+ }
}
@Test
public void testAsyncBatchSendMessage() {