You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2021/09/09 10:07:30 UTC

[GitHub] [rocketmq] dragonTalon opened a new pull request #3337: 实现了一个自定义时间的延迟消息(add function--Custom delay time)

dragonTalon opened a new pull request #3337:
URL: https://github.com/apache/rocketmq/pull/3337


   **Make sure set the target branch to `develop`**
   
   ## What is the purpose of the change
   
   增加了一个自定义延迟消息的实现,通过message 的property中的属性来实现
   
   Added the realization of a custom delayed message, which is realized through the properties in the property of the message
   
   property name :SPECIFY_DELAY_TIME
   
   value : 
   格式为: ()d()h()m()s
   例: 1d1h1m1s 表示为 延迟 1天加1小时加1分钟加1秒
   
   The format is: ()d()h()m()s
   Example: 1d1h1m1s is expressed as delay 1 day plus 1 hour plus 1 minute plus 1 second
   
   ## Brief changelog
   
   XX
   
   ## Verifying this change
   
   XXXX
   验证方式:(test way)
   生产者(production)
   `
      DefaultMQProducer producer = new DefaultMQProducer("DELAY_GROUP_TAG");
   
           producer.setNamesrvAddr("127.0.0.1:9876");
   
           producer.start();
   
           final byte[] bytes = "this is cutomer 2m2s message" .getBytes(StandardCharsets.UTF_8);
   
           Message message = new Message("DELAY_TOPIC_TAG_1", "*", bytes);
   
           //这是我实现延迟队列用到的
   
           message.putUserProperty("SPECIFY_DELAY_TIME", "2m2s");
   
           producer.sendOneway(message);
   
           producer.shutdown();
   `
   消费者:(consume)
   `
       public static void main(String[] args) throws MQClientException {
   
           DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("DELAY_GROUP_TAG");
   
           consumer.setNamesrvAddr("127.0.0.1:9876");
   
           consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
   
           //Topic的名称
   
           consumer.subscribe("DELAY_TOPIC_TAG_1", "*");
   
           consumer.setSuspendCurrentQueueTimeMillis(30);
   
           consumer.registerMessageListener(new MessageListenerConcurrently() {
   
               @Override
               public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
   
                   for (MessageExt ext : list) {
   
                       final long time = System.currentTimeMillis() - ext.getBornTimestamp();
   
                       System.out.println("消息\t" + new String(ext.getBody(), StandardCharsets.UTF_8) + "接收到的消息间隔消息\t" + time / 1000.0 );
   
                   }
   
                   return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
               }
   
           });
   
           consumer.start();
       }
   `
   [这是我的验证结果文档](https://zhuanlan.zhihu.com/p/408702118)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] dragonTalon commented on pull request #3337: [ISSUE #3359] Add function--Custom delay time

Posted by GitBox <gi...@apache.org>.
dragonTalon commented on pull request #3337:
URL: https://github.com/apache/rocketmq/pull/3337#issuecomment-956231283


    @Jason918 
   there any problem with this pr merger?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] dragonTalon commented on pull request #3337: [ISSUE #3359] Add function--Custom delay time

Posted by GitBox <gi...@apache.org>.
dragonTalon commented on pull request #3337:
URL: https://github.com/apache/rocketmq/pull/3337#issuecomment-920931808


   > Could you please to write a issue firstly, you can refer other issues' formate! @dragonTalon
   
   ok,i change my pr add issue;You can take a look


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] ShannonDing commented on pull request #3337: 实现了一个自定义时间的延迟消息(add function--Custom delay time)

Posted by GitBox <gi...@apache.org>.
ShannonDing commented on pull request #3337:
URL: https://github.com/apache/rocketmq/pull/3337#issuecomment-916693617


   Greate. it is better to change this PR to develop branch.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] dragonTalon commented on a change in pull request #3337: [ISSUE #3359] Add function--Custom delay time

Posted by GitBox <gi...@apache.org>.
dragonTalon commented on a change in pull request #3337:
URL: https://github.com/apache/rocketmq/pull/3337#discussion_r741944439



##########
File path: broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
##########
@@ -212,10 +212,10 @@ public static BrokerController createBrokerController(String[] args) {
             MixAll.printObjectProperties(log, messageStoreConfig);
 
             final BrokerController controller = new BrokerController(
-                brokerConfig,
-                nettyServerConfig,
-                nettyClientConfig,
-                messageStoreConfig);
+                    brokerConfig,

Review comment:
       ok ,i reduction this file




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] Jason918 commented on a change in pull request #3337: [ISSUE #3359] Add function--Custom delay time

Posted by GitBox <gi...@apache.org>.
Jason918 commented on a change in pull request #3337:
URL: https://github.com/apache/rocketmq/pull/3337#discussion_r718991689



##########
File path: common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java
##########
@@ -52,6 +52,10 @@
     public static final String PROPERTY_PUSH_REPLY_TIME = "PUSH_REPLY_TIME";
     public static final String PROPERTY_CLUSTER = "CLUSTER";
     public static final String PROPERTY_MESSAGE_TYPE = "MSG_TYPE";
+    public static final String PROPERTY_SPECIFY_DELAY_TIME = "SPECIFY_DELAY_TIME";

Review comment:
       Maybe "CUSTOM_DELAY_TIME" is a better name? 

##########
File path: namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvStartup.java
##########
@@ -48,6 +50,7 @@
     private static CommandLine commandLine = null;
 
     public static void main(String[] args) {
+        System.setProperty(MixAll.ROCKETMQ_HOME_PROPERTY, "/Users/dragonboy/IdeaProjects/rocketmq/distribution");

Review comment:
       Test code? 

##########
File path: store/src/main/java/org/apache/rocketmq/store/schedule/CustomDelayMessageService.java
##########
@@ -0,0 +1,496 @@
+package org.apache.rocketmq.store.schedule;
+
+import io.netty.util.concurrent.DefaultThreadFactory;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.common.ConfigManager;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.TopicFilterType;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.message.MessageAccessor;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.running.RunningStats;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.store.*;
+import org.apache.rocketmq.store.config.StorePathConfigHelper;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.TimerTask;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * @ClassName CutomDelayMessageService
+ * @Version 1.0
+ * @Author dragonboy
+ * @Date 2021/9/6 14:12
+ * @Description
+ **/
+public class CustomDelayMessageService extends ConfigManager {
+    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
+
+    public static final String SCHEDULE_TOPIC = "DRAGON_TOPIC_XXX";
+
+    private static final Pattern pattern = Pattern.compile("([0-9]+d)*([0-9]+h)*([0-9]+m)*([0-9]+s)*");
+
+    private static final HashMap<String, Long> timeUnitTable = new HashMap<String, Long>() {{
+        this.put("s", 1000L);
+        this.put("m", 1000L * 60);
+        this.put("h", 1000L * 60 * 60);
+        this.put("d", 1000L * 60 * 60 * 24);
+    }};
+
+    //最大限度的处理层级
+    public static int MAX_LIMIT_LEVEL = 0;

Review comment:
       This should not be constant and not configurable.

##########
File path: store/src/main/java/org/apache/rocketmq/store/config/StorePathConfigHelper.java
##########
@@ -48,6 +48,10 @@ public static String getDelayOffsetStorePath(final String rootDir) {
         return rootDir + File.separator + "config" + File.separator + "delayOffset.json";
     }
 
+    public static String getSpcifyDelayOffsetStorePath(final String rootDir) {
+        return rootDir + File.separator + "dragon" + File.separator + "delayOffset.json";

Review comment:
       "dragon" here is a little confusing.

##########
File path: store/src/main/java/org/apache/rocketmq/store/schedule/CustomDelayMessageService.java
##########
@@ -0,0 +1,496 @@
+package org.apache.rocketmq.store.schedule;
+
+import io.netty.util.concurrent.DefaultThreadFactory;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.common.ConfigManager;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.TopicFilterType;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.message.MessageAccessor;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.running.RunningStats;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.store.*;
+import org.apache.rocketmq.store.config.StorePathConfigHelper;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.TimerTask;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * @ClassName CutomDelayMessageService
+ * @Version 1.0
+ * @Author dragonboy
+ * @Date 2021/9/6 14:12
+ * @Description
+ **/
+public class CustomDelayMessageService extends ConfigManager {

Review comment:
       Please add some unit tests.

##########
File path: broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
##########
@@ -64,7 +64,7 @@ public static BrokerController start(BrokerController controller) {
             controller.start();
 
             String tip = "The broker[" + controller.getBrokerConfig().getBrokerName() + ", "
-                + controller.getBrokerAddr() + "] boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
+                    + controller.getBrokerAddr() + "] boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();

Review comment:
       I think it's better not changing the format of old code without logical modification.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] Jason918 commented on a change in pull request #3337: [ISSUE #3359] Add function--Custom delay time

Posted by GitBox <gi...@apache.org>.
Jason918 commented on a change in pull request #3337:
URL: https://github.com/apache/rocketmq/pull/3337#discussion_r741934726



##########
File path: broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
##########
@@ -212,10 +212,10 @@ public static BrokerController createBrokerController(String[] args) {
             MixAll.printObjectProperties(log, messageStoreConfig);
 
             final BrokerController controller = new BrokerController(
-                brokerConfig,
-                nettyServerConfig,
-                nettyClientConfig,
-                messageStoreConfig);
+                    brokerConfig,

Review comment:
       It seems that only the indention is changed in this file. 
   IMHO, It's better keep as it is, because it will change the log history, and it will be confusing if someone want to check the implementation details through the git log.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] areyouok commented on pull request #3337: [ISSUE #3359] Add function--Custom delay time

Posted by GitBox <gi...@apache.org>.
areyouok commented on pull request #3337:
URL: https://github.com/apache/rocketmq/pull/3337#issuecomment-985449521


   @dragonTalon
   Thanks for your contribution. I think this solution introduce too many write.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] dragonTalon commented on a change in pull request #3337: [ISSUE #3359] Add function--Custom delay time

Posted by GitBox <gi...@apache.org>.
dragonTalon commented on a change in pull request #3337:
URL: https://github.com/apache/rocketmq/pull/3337#discussion_r741944439



##########
File path: broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
##########
@@ -212,10 +212,10 @@ public static BrokerController createBrokerController(String[] args) {
             MixAll.printObjectProperties(log, messageStoreConfig);
 
             final BrokerController controller = new BrokerController(
-                brokerConfig,
-                nettyServerConfig,
-                nettyClientConfig,
-                messageStoreConfig);
+                    brokerConfig,

Review comment:
       ok ,i reduction this file

##########
File path: broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
##########
@@ -212,10 +212,10 @@ public static BrokerController createBrokerController(String[] args) {
             MixAll.printObjectProperties(log, messageStoreConfig);
 
             final BrokerController controller = new BrokerController(
-                brokerConfig,
-                nettyServerConfig,
-                nettyClientConfig,
-                messageStoreConfig);
+                    brokerConfig,

Review comment:
       ok ,i reduction this file




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] Jason918 commented on a change in pull request #3337: [ISSUE #3359] Add function--Custom delay time

Posted by GitBox <gi...@apache.org>.
Jason918 commented on a change in pull request #3337:
URL: https://github.com/apache/rocketmq/pull/3337#discussion_r741934726



##########
File path: broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
##########
@@ -212,10 +212,10 @@ public static BrokerController createBrokerController(String[] args) {
             MixAll.printObjectProperties(log, messageStoreConfig);
 
             final BrokerController controller = new BrokerController(
-                brokerConfig,
-                nettyServerConfig,
-                nettyClientConfig,
-                messageStoreConfig);
+                    brokerConfig,

Review comment:
       It seems that only the indention is changed in this file. 
   IMHO, It's better keep as it is, because it will change the log history, and it will be confusing if someone want to check the implementation details through the git log.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] dragonTalon commented on pull request #3337: [ISSUE #3359] Add function--Custom delay time

Posted by GitBox <gi...@apache.org>.
dragonTalon commented on pull request #3337:
URL: https://github.com/apache/rocketmq/pull/3337#issuecomment-986840176


   > Thanks for your contribution. I think this solution introduce too many write.
   
   yes,i  use many consumerQueue save task.
   I use to a method of Layering time slicing. If you do not perform slicing but repeat in a consumerQueue, there may be errors if there are more delayed tasks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] Jason918 commented on a change in pull request #3337: [ISSUE #3359] Add function--Custom delay time

Posted by GitBox <gi...@apache.org>.
Jason918 commented on a change in pull request #3337:
URL: https://github.com/apache/rocketmq/pull/3337#discussion_r741934726



##########
File path: broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
##########
@@ -212,10 +212,10 @@ public static BrokerController createBrokerController(String[] args) {
             MixAll.printObjectProperties(log, messageStoreConfig);
 
             final BrokerController controller = new BrokerController(
-                brokerConfig,
-                nettyServerConfig,
-                nettyClientConfig,
-                messageStoreConfig);
+                    brokerConfig,

Review comment:
       It seems that only the indention is changed in this file. 
   IMHO, It's better keep as it is, because it will change the log history, and it will be confusing if someone want to check the implementation details through the git log.

##########
File path: broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
##########
@@ -212,10 +212,10 @@ public static BrokerController createBrokerController(String[] args) {
             MixAll.printObjectProperties(log, messageStoreConfig);
 
             final BrokerController controller = new BrokerController(
-                brokerConfig,
-                nettyServerConfig,
-                nettyClientConfig,
-                messageStoreConfig);
+                    brokerConfig,

Review comment:
       It seems that only the indention is changed in this file. 
   IMHO, It's better keep as it is, because it will change the log history, and it will be confusing if someone want to check the implementation details through the git log.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] dragonTalon commented on a change in pull request #3337: [ISSUE #3359] Add function--Custom delay time

Posted by GitBox <gi...@apache.org>.
dragonTalon commented on a change in pull request #3337:
URL: https://github.com/apache/rocketmq/pull/3337#discussion_r719003969



##########
File path: store/src/main/java/org/apache/rocketmq/store/schedule/CustomDelayMessageService.java
##########
@@ -0,0 +1,496 @@
+package org.apache.rocketmq.store.schedule;
+
+import io.netty.util.concurrent.DefaultThreadFactory;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.common.ConfigManager;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.TopicFilterType;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.message.MessageAccessor;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.running.RunningStats;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.store.*;
+import org.apache.rocketmq.store.config.StorePathConfigHelper;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.TimerTask;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * @ClassName CutomDelayMessageService
+ * @Version 1.0
+ * @Author dragonboy
+ * @Date 2021/9/6 14:12
+ * @Description
+ **/
+public class CustomDelayMessageService extends ConfigManager {
+    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
+
+    public static final String SCHEDULE_TOPIC = "DRAGON_TOPIC_XXX";
+
+    private static final Pattern pattern = Pattern.compile("([0-9]+d)*([0-9]+h)*([0-9]+m)*([0-9]+s)*");
+
+    private static final HashMap<String, Long> timeUnitTable = new HashMap<String, Long>() {{
+        this.put("s", 1000L);
+        this.put("m", 1000L * 60);
+        this.put("h", 1000L * 60 * 60);
+        this.put("d", 1000L * 60 * 60 * 24);
+    }};
+
+    //最大限度的处理层级
+    public static int MAX_LIMIT_LEVEL = 0;

Review comment:
       i wang change this code , use System.getProperty("custom.delaytime") set delay config




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] Jason918 commented on pull request #3337: [ISSUE #3359] Add function--Custom delay time

Posted by GitBox <gi...@apache.org>.
Jason918 commented on pull request #3337:
URL: https://github.com/apache/rocketmq/pull/3337#issuecomment-932673120


   It's better to compare this with the solution in #2290 to see in what situation this approach fits better.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] zongtanghu commented on pull request #3337: [ISSUE #3359] Add function--Custom delay time

Posted by GitBox <gi...@apache.org>.
zongtanghu commented on pull request #3337:
URL: https://github.com/apache/rocketmq/pull/3337#issuecomment-920967043


   Okay!Nice!
   
   
    原始邮件 
   发件人: ***@***.***>
   收件人: ***@***.***>
   抄送: Hu ***@***.***>; ***@***.***>
   发送时间: 2021年9月16日(周四) 22:03
   主题: Re: [apache/rocketmq] [ISSUE #3359] Add function--Custom delay time(#3337)
   
   
   Could you please to write a issue firstly, you can refer other issues' formate! @dragonTalon
   ok,i change my pr add issue;You can take a look
   —
   You are receiving this because you commented.
   Reply to this email directly, view it on GitHub, or unsubscribe.
   Triage notifications on the go with GitHub Mobile for iOS or Android.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] yourlin commented on pull request #3337: [ISSUE #3359] Add function--Custom delay time

Posted by GitBox <gi...@apache.org>.
yourlin commented on pull request #3337:
URL: https://github.com/apache/rocketmq/pull/3337#issuecomment-930700610


   I hope this feature would launch soon. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] dragonTalon commented on a change in pull request #3337: [ISSUE #3359] Add function--Custom delay time

Posted by GitBox <gi...@apache.org>.
dragonTalon commented on a change in pull request #3337:
URL: https://github.com/apache/rocketmq/pull/3337#discussion_r718999065



##########
File path: common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java
##########
@@ -52,6 +52,10 @@
     public static final String PROPERTY_PUSH_REPLY_TIME = "PUSH_REPLY_TIME";
     public static final String PROPERTY_CLUSTER = "CLUSTER";
     public static final String PROPERTY_MESSAGE_TYPE = "MSG_TYPE";
+    public static final String PROPERTY_SPECIFY_DELAY_TIME = "SPECIFY_DELAY_TIME";

Review comment:
       ok 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] dragonTalon commented on a change in pull request #3337: [ISSUE #3359] Add function--Custom delay time

Posted by GitBox <gi...@apache.org>.
dragonTalon commented on a change in pull request #3337:
URL: https://github.com/apache/rocketmq/pull/3337#discussion_r719003609



##########
File path: store/src/main/java/org/apache/rocketmq/store/schedule/CustomDelayMessageService.java
##########
@@ -0,0 +1,496 @@
+package org.apache.rocketmq.store.schedule;
+
+import io.netty.util.concurrent.DefaultThreadFactory;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.common.ConfigManager;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.TopicFilterType;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.message.MessageAccessor;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.running.RunningStats;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.store.*;
+import org.apache.rocketmq.store.config.StorePathConfigHelper;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.TimerTask;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * @ClassName CutomDelayMessageService
+ * @Version 1.0
+ * @Author dragonboy
+ * @Date 2021/9/6 14:12
+ * @Description
+ **/
+public class CustomDelayMessageService extends ConfigManager {
+    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
+
+    public static final String SCHEDULE_TOPIC = "DRAGON_TOPIC_XXX";
+
+    private static final Pattern pattern = Pattern.compile("([0-9]+d)*([0-9]+h)*([0-9]+m)*([0-9]+s)*");
+
+    private static final HashMap<String, Long> timeUnitTable = new HashMap<String, Long>() {{
+        this.put("s", 1000L);
+        this.put("m", 1000L * 60);
+        this.put("h", 1000L * 60 * 60);
+        this.put("d", 1000L * 60 * 60 * 24);
+    }};
+
+    //最大限度的处理层级
+    public static int MAX_LIMIT_LEVEL = 0;

Review comment:
       this code support custom time in 7 day 
   so,this is record max level.
   if level more max level Cause out of bounds




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] dragonTalon commented on a change in pull request #3337: [ISSUE #3359] Add function--Custom delay time

Posted by GitBox <gi...@apache.org>.
dragonTalon commented on a change in pull request #3337:
URL: https://github.com/apache/rocketmq/pull/3337#discussion_r741944439



##########
File path: broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
##########
@@ -212,10 +212,10 @@ public static BrokerController createBrokerController(String[] args) {
             MixAll.printObjectProperties(log, messageStoreConfig);
 
             final BrokerController controller = new BrokerController(
-                brokerConfig,
-                nettyServerConfig,
-                nettyClientConfig,
-                messageStoreConfig);
+                    brokerConfig,

Review comment:
       ok ,i reduction this file




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] dragonTalon commented on pull request #3337: [ISSUE #3359] Add function--Custom delay time

Posted by GitBox <gi...@apache.org>.
dragonTalon commented on pull request #3337:
URL: https://github.com/apache/rocketmq/pull/3337#issuecomment-984461924


   @Jason918 check my branch,please


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] zongtanghu commented on pull request #3337: [ISSUE #XXXX] Add function--Custom delay time

Posted by GitBox <gi...@apache.org>.
zongtanghu commented on pull request #3337:
URL: https://github.com/apache/rocketmq/pull/3337#issuecomment-920807703


   Could you please to write a issue firstly, you can refer other issues' formate! @dragonTalon 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org