You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2021/09/30 01:41:05 UTC

[GitHub] [rocketmq] Jason918 commented on a change in pull request #3337: [ISSUE #3359] Add function--Custom delay time

Jason918 commented on a change in pull request #3337:
URL: https://github.com/apache/rocketmq/pull/3337#discussion_r718991689



##########
File path: common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java
##########
@@ -52,6 +52,10 @@
     public static final String PROPERTY_PUSH_REPLY_TIME = "PUSH_REPLY_TIME";
     public static final String PROPERTY_CLUSTER = "CLUSTER";
     public static final String PROPERTY_MESSAGE_TYPE = "MSG_TYPE";
+    public static final String PROPERTY_SPECIFY_DELAY_TIME = "SPECIFY_DELAY_TIME";

Review comment:
       Maybe "CUSTOM_DELAY_TIME" is a better name? 

##########
File path: namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvStartup.java
##########
@@ -48,6 +50,7 @@
     private static CommandLine commandLine = null;
 
     public static void main(String[] args) {
+        System.setProperty(MixAll.ROCKETMQ_HOME_PROPERTY, "/Users/dragonboy/IdeaProjects/rocketmq/distribution");

Review comment:
       Test code? 

##########
File path: store/src/main/java/org/apache/rocketmq/store/schedule/CustomDelayMessageService.java
##########
@@ -0,0 +1,496 @@
+package org.apache.rocketmq.store.schedule;
+
+import io.netty.util.concurrent.DefaultThreadFactory;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.common.ConfigManager;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.TopicFilterType;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.message.MessageAccessor;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.running.RunningStats;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.store.*;
+import org.apache.rocketmq.store.config.StorePathConfigHelper;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.TimerTask;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * @ClassName CutomDelayMessageService
+ * @Version 1.0
+ * @Author dragonboy
+ * @Date 2021/9/6 14:12
+ * @Description
+ **/
+public class CustomDelayMessageService extends ConfigManager {
+    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
+
+    public static final String SCHEDULE_TOPIC = "DRAGON_TOPIC_XXX";
+
+    private static final Pattern pattern = Pattern.compile("([0-9]+d)*([0-9]+h)*([0-9]+m)*([0-9]+s)*");
+
+    private static final HashMap<String, Long> timeUnitTable = new HashMap<String, Long>() {{
+        this.put("s", 1000L);
+        this.put("m", 1000L * 60);
+        this.put("h", 1000L * 60 * 60);
+        this.put("d", 1000L * 60 * 60 * 24);
+    }};
+
+    //最大限度的处理层级
+    public static int MAX_LIMIT_LEVEL = 0;

Review comment:
       This should not be constant and not configurable.

##########
File path: store/src/main/java/org/apache/rocketmq/store/config/StorePathConfigHelper.java
##########
@@ -48,6 +48,10 @@ public static String getDelayOffsetStorePath(final String rootDir) {
         return rootDir + File.separator + "config" + File.separator + "delayOffset.json";
     }
 
+    public static String getSpcifyDelayOffsetStorePath(final String rootDir) {
+        return rootDir + File.separator + "dragon" + File.separator + "delayOffset.json";

Review comment:
       "dragon" here is a little confusing.

##########
File path: store/src/main/java/org/apache/rocketmq/store/schedule/CustomDelayMessageService.java
##########
@@ -0,0 +1,496 @@
+package org.apache.rocketmq.store.schedule;
+
+import io.netty.util.concurrent.DefaultThreadFactory;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.common.ConfigManager;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.TopicFilterType;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.message.MessageAccessor;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.running.RunningStats;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.store.*;
+import org.apache.rocketmq.store.config.StorePathConfigHelper;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.TimerTask;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * @ClassName CutomDelayMessageService
+ * @Version 1.0
+ * @Author dragonboy
+ * @Date 2021/9/6 14:12
+ * @Description
+ **/
+public class CustomDelayMessageService extends ConfigManager {

Review comment:
       Please add some unit tests.

##########
File path: broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
##########
@@ -64,7 +64,7 @@ public static BrokerController start(BrokerController controller) {
             controller.start();
 
             String tip = "The broker[" + controller.getBrokerConfig().getBrokerName() + ", "
-                + controller.getBrokerAddr() + "] boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
+                    + controller.getBrokerAddr() + "] boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();

Review comment:
       I think it's better not changing the format of old code without logical modification.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org