You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2022/01/07 04:00:52 UTC

[GitHub] [rocketmq] duhenglucky commented on a change in pull request #3694: [RIP-28] light message queue(LMQ)

duhenglucky commented on a change in pull request #3694:
URL: https://github.com/apache/rocketmq/pull/3694#discussion_r780012318



##########
File path: broker/src/test/java/org/apache/rocketmq/broker/BrokerPathConfigHelperTest.java
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.broker;
+
+import org.junit.Test;
+
+import static org.junit.Assert.*;

Review comment:
       Don't use * to replace all imports.

##########
File path: test/src/main/java/org/apache/rocketmq/test/lmq/benchmark/BenchLmqStore.java
##########
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.test.lmq.benchmark;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
+import org.apache.rocketmq.client.consumer.PullCallback;
+import org.apache.rocketmq.client.consumer.PullResult;
+import org.apache.rocketmq.client.consumer.PullStatus;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.ThreadFactoryImpl;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetRequestHeader;
+import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.test.util.StatUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+public class BenchLmqStore {
+    private static Logger logger = LoggerFactory.getLogger(BenchLmqStore.class);
+    private static String namesrv = System.getProperty("namesrv", "127.0.0.1:9876");
+    private static String lmqTopic = System.getProperty("lmqTopic", "lmqTestTopic");
+    private static boolean enableSub = Boolean.parseBoolean(System.getProperty("enableSub", "true"));
+    private static String queuePrefix = System.getProperty("queuePrefix", "lmqTest");
+    private static int tps = Integer.parseInt(System.getProperty("tps", "1"));
+    private static int lmqNum = Integer.parseInt(System.getProperty("lmqNum", "1"));
+    private static int sendThreadNum = Integer.parseInt(System.getProperty("sendThreadNum", "64"));
+    private static int consumerThreadNum = Integer.parseInt(System.getProperty("consumerThreadNum", "64"));
+    private static String brokerName = System.getProperty("brokerName", "broker-a");
+    private static int size = Integer.parseInt(System.getProperty("size", "128"));
+    private static int suspendTime = Integer.parseInt(System.getProperty("suspendTime", "2000"));
+    private static final boolean RETRY_NO_MATCHED_MSG = Boolean.parseBoolean(System.getProperty("retry_no_matched_msg", "false"));
+    private static boolean benchOffset = Boolean.parseBoolean(System.getProperty("benchOffset", "false"));
+    private static int benchOffsetNum = Integer.parseInt(System.getProperty("benchOffsetNum", "1"));
+    private static Map<MessageQueue, Long> offsetMap = new ConcurrentHashMap<>(256);
+    private static Map<MessageQueue, Boolean> pullStatus = new ConcurrentHashMap<>(256);
+    private static Map<Integer, Map<MessageQueue, Long>> pullEvent = new ConcurrentHashMap<>(256);
+    public static DefaultMQProducer defaultMQProducer;
+    private static int pullConsumerNum = Integer.parseInt(System.getProperty("pullConsumerNum", "8"));
+    public static DefaultMQPullConsumer[] defaultMQPullConsumers = new DefaultMQPullConsumer[pullConsumerNum];
+    private static AtomicLong rid = new AtomicLong();
+    private static final String LMQ_PREFIX = "%LMQ%";
+
+    public static void main(String[] args) throws InterruptedException, MQClientException, MQBrokerException,
+        RemotingException {
+        defaultMQProducer = new DefaultMQProducer();
+        defaultMQProducer.setProducerGroup("PID_LMQ_TEST");
+        defaultMQProducer.setVipChannelEnabled(false);
+        defaultMQProducer.setNamesrvAddr(namesrv);
+        defaultMQProducer.start();
+        //defaultMQProducer.createTopic(lmqTopic, lmqTopic, 8);
+        for (int i = 0; i < pullConsumerNum; i++) {
+            DefaultMQPullConsumer defaultMQPullConsumer = new DefaultMQPullConsumer();
+            defaultMQPullConsumers[i] = defaultMQPullConsumer;
+            defaultMQPullConsumer.setNamesrvAddr(namesrv);
+            defaultMQPullConsumer.setVipChannelEnabled(false);
+            defaultMQPullConsumer.setConsumerGroup("CID_RMQ_SYS_LMQ_TEST_" + i);
+            defaultMQPullConsumer.setInstanceName("CID_RMQ_SYS_LMQ_TEST_" + i);
+            defaultMQPullConsumer.setRegisterTopics(new HashSet<>(Arrays.asList(lmqTopic)));
+            defaultMQPullConsumer.setBrokerSuspendMaxTimeMillis(suspendTime);
+            defaultMQPullConsumer.setConsumerTimeoutMillisWhenSuspend(suspendTime + 1000);
+            defaultMQPullConsumer.start();
+        }
+        Thread.sleep(3000L);
+        if (benchOffset) {
+            doBenchOffset();
+            return;
+        }
+        ScheduledThreadPoolExecutor consumerPool = new ScheduledThreadPoolExecutor(consumerThreadNum, new ThreadFactoryImpl("test"));
+        for (int i = 0; i < consumerThreadNum; i++) {
+            final int idx = i;
+            consumerPool.scheduleWithFixedDelay(() -> {
+                try {
+                    Map<MessageQueue, Long> map = pullEvent.get(idx);
+                    if (map == null) {
+                        return;
+                    }
+                    for (Map.Entry<MessageQueue, Long> entry : map.entrySet()) {
+                        try {
+                            Boolean status = pullStatus.get(entry.getKey());
+                            if (Boolean.TRUE.equals(status)) {
+                                continue;
+                            }
+                            doPull(map, entry.getKey(), entry.getValue());
+                        } catch (Exception e) {
+                            logger.error(" ", e);
+                        }
+                    }
+                } catch (Exception e) {

Review comment:
       Same with the last comment,nesting level should be reduced and error log may be can more clear

##########
File path: store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
##########
@@ -422,6 +429,47 @@ public void putMessagePositionInfoWrapper(DispatchRequest request) {
         this.defaultMessageStore.getRunningFlags().makeLogicsQueueError();
     }
 
+    private void multiDispatchQueue(DispatchRequest request, int maxRetries) {
+        Map<String, String> prop = request.getPropertiesMap();
+        String multiDispatchQueue = prop.get(MessageConst.PROPERTY_INNER_MULTI_DISPATCH);
+        String multiQueueOffset = prop.get(MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET);
+        String[] queues = multiDispatchQueue.split(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER);
+        String[] queueOffsets = multiQueueOffset.split(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER);
+        if (queues.length != queueOffsets.length) {
+            log.error("[bug] queues.length!=queueOffsets.length ", request.getTopic());
+            return;
+        }
+        for (int i = 0; i < queues.length; i++) {
+            String queueName = queues[i];
+            long queueOffset = Long.parseLong(queueOffsets[i]);
+            int queueId = request.getQueueId();
+            if (this.defaultMessageStore.getMessageStoreConfig().isEnableLmq() && MixAll.isLmq(queueName)) {
+                queueId = 0;
+            }
+            ConsumeQueue cq = this.defaultMessageStore.findConsumeQueue(queueName, queueId);
+            boolean canWrite = this.defaultMessageStore.getRunningFlags().isCQWriteable();
+            for (int j = 0; j < maxRetries && canWrite; j++) {
+                boolean result = cq.putMessagePositionInfo(request.getCommitLogOffset(), request.getMsgSize(),
+                    request.getTagsCode(),
+                    queueOffset);
+                if (result) {
+                    break;
+                } else {
+                    log.warn("[BUG]put commit log position info to " + queueName + ":" + queueId + " " + request.getCommitLogOffset()
+                        + " failed, retry " + j + " times");
+
+                    try {

Review comment:
       Would you like to reduce the nesting level?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org