You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2022/12/07 09:22:08 UTC

[rocketmq] branch develop updated: [ISSUE #5654] support calculate inflight messages for pop (#5655)

This is an automated email from the ASF dual-hosted git repository.

lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 4c7f0ecce [ISSUE #5654] support calculate inflight messages for pop (#5655)
4c7f0ecce is described below

commit 4c7f0eccecff258c4f6df2a66198e14bc5dad727
Author: lk <xd...@outlook.com>
AuthorDate: Wed Dec 7 17:21:42 2022 +0800

    [ISSUE #5654] support calculate inflight messages for pop (#5655)
---
 .../apache/rocketmq/broker/BrokerController.java   |   7 +
 .../broker/processor/AckMessageProcessor.java      |  11 ++
 .../broker/processor/AdminBrokerProcessor.java     |   3 +
 .../processor/PopInflightMessageCounter.java       | 156 +++++++++++++++++++++
 .../broker/processor/PopMessageProcessor.java      |  14 ++
 .../broker/processor/PopReviveService.java         |   1 +
 .../processor/PopInflightMessageCounterTest.java   | 101 +++++++++++++
 7 files changed, 293 insertions(+)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 5697afce3..0a5df7cb0 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -81,6 +81,7 @@ import org.apache.rocketmq.broker.processor.EndTransactionProcessor;
 import org.apache.rocketmq.broker.processor.NotificationProcessor;
 import org.apache.rocketmq.broker.processor.PeekMessageProcessor;
 import org.apache.rocketmq.broker.processor.PollingInfoProcessor;
+import org.apache.rocketmq.broker.processor.PopInflightMessageCounter;
 import org.apache.rocketmq.broker.processor.PopMessageProcessor;
 import org.apache.rocketmq.broker.processor.PullMessageProcessor;
 import org.apache.rocketmq.broker.processor.QueryAssignmentProcessor;
@@ -173,6 +174,7 @@ public class BrokerController {
     protected final ConsumerManager consumerManager;
     protected final ConsumerFilterManager consumerFilterManager;
     protected final ConsumerOrderInfoManager consumerOrderInfoManager;
+    protected final PopInflightMessageCounter popInflightMessageCounter;
     protected final ProducerManager producerManager;
     protected final ScheduleMessageService scheduleMessageService;
     protected final ClientHousekeepingService clientHousekeepingService;
@@ -317,6 +319,7 @@ public class BrokerController {
         this.producerManager = new ProducerManager(this.brokerStatsManager);
         this.consumerFilterManager = new ConsumerFilterManager(this);
         this.consumerOrderInfoManager = new ConsumerOrderInfoManager(this);
+        this.popInflightMessageCounter = new PopInflightMessageCounter(this);
         this.clientHousekeepingService = new ClientHousekeepingService(this);
         this.broker2Client = new Broker2Client(this);
         this.subscriptionGroupManager = messageStoreConfig.isEnableLmq() ? new LmqSubscriptionGroupManager(this) : new SubscriptionGroupManager(this);
@@ -1177,6 +1180,10 @@ public class BrokerController {
         return consumerOrderInfoManager;
     }
 
+    public PopInflightMessageCounter getPopInflightMessageCounter() {
+        return popInflightMessageCounter;
+    }
+
     public ConsumerOffsetManager getConsumerOffsetManager() {
         return consumerOffsetManager;
     }
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
index d2886542b..80f06aed0 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
@@ -184,10 +184,12 @@ public class AckMessageProcessor implements NettyRequestProcessor {
             } finally {
                 this.brokerController.getPopMessageProcessor().getQueueLockManager().unLock(lockKey);
             }
+            decInFlightMessageNum(requestHeader);
             return response;
         }
 
         if (this.brokerController.getPopMessageProcessor().getPopBufferMergeService().addAk(rqId, ackMsg)) {
+            decInFlightMessageNum(requestHeader);
             return response;
         }
 
@@ -209,7 +211,16 @@ public class AckMessageProcessor implements NettyRequestProcessor {
             && putMessageResult.getPutMessageStatus() != PutMessageStatus.SLAVE_NOT_AVAILABLE) {
             POP_LOGGER.error("put ack msg error:" + putMessageResult);
         }
+        decInFlightMessageNum(requestHeader);
         return response;
     }
 
+    private void decInFlightMessageNum(AckMessageRequestHeader requestHeader) {
+        this.brokerController.getPopInflightMessageCounter().decrementInFlightMessageNum(
+            requestHeader.getTopic(),
+            requestHeader.getConsumerGroup(),
+            requestHeader.getExtraInfo()
+        );
+    }
+
 }
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index dfbd886f2..ad86ab34a 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -522,6 +522,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
         this.brokerController.getTopicConfigManager().deleteTopicConfig(requestHeader.getTopic());
         this.brokerController.getTopicQueueMappingManager().delete(requestHeader.getTopic());
         this.brokerController.getConsumerOffsetManager().cleanOffsetByTopic(requestHeader.getTopic());
+        this.brokerController.getPopInflightMessageCounter().clearInFlightMessageNumByTopicName(requestHeader.getTopic());
         this.brokerController.getMessageStore()
             .cleanUnusedTopic(this.brokerController.getTopicConfigManager().getTopicConfigTable().keySet());
         if (this.brokerController.getBrokerConfig().isAutoDeleteUnusedStats()) {
@@ -1325,6 +1326,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
 
         if (requestHeader.isCleanOffset()) {
             this.brokerController.getConsumerOffsetManager().removeOffset(requestHeader.getGroupName());
+            this.brokerController.getPopInflightMessageCounter().clearInFlightMessageNumByGroupName(requestHeader.getGroupName());
         }
 
         if (this.brokerController.getBrokerConfig().isAutoDeleteUnusedStats()) {
@@ -1765,6 +1767,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
         ResetOffsetBody body = new ResetOffsetBody();
         String brokerName = brokerController.getBrokerConfig().getBrokerName();
         for (Map.Entry<Integer, Long> entry : queueOffsetMap.entrySet()) {
+            brokerController.getPopInflightMessageCounter().clearInFlightMessageNum(topic, group, entry.getKey());
             body.getOffsetTable().put(new MessageQueue(topic, brokerName, entry.getKey()), entry.getValue());
         }
 
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopInflightMessageCounter.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopInflightMessageCounter.java
new file mode 100644
index 000000000..584cc54ba
--- /dev/null
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopInflightMessageCounter.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.broker.processor;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.common.Pair;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+import org.apache.rocketmq.remoting.protocol.header.ExtraInfoUtil;
+import org.apache.rocketmq.store.pop.PopCheckPoint;
+
+public class PopInflightMessageCounter {
+    private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+
+    private static final String TOPIC_GROUP_SEPARATOR = "@";
+    private final Map<String /* topic@group */, Map<Integer /* queueId */, AtomicLong>> topicInFlightMessageNum =
+        new ConcurrentHashMap<>(512);
+    private final BrokerController brokerController;
+
+    public PopInflightMessageCounter(BrokerController brokerController) {
+        this.brokerController = brokerController;
+    }
+
+    public void incrementInFlightMessageNum(String topic, String group, int queueId, int num) {
+        if (num <= 0) {
+            return;
+        }
+        topicInFlightMessageNum.compute(buildKey(topic, group), (key, queueNum) -> {
+            if (queueNum == null) {
+                queueNum = new ConcurrentHashMap<>(8);
+            }
+            queueNum.compute(queueId, (queueIdKey, counter) -> {
+                if (counter == null) {
+                    return new AtomicLong(num);
+                }
+                if (counter.addAndGet(num) <= 0) {
+                    return null;
+                }
+                return counter;
+            });
+            return queueNum;
+        });
+    }
+
+    public void decrementInFlightMessageNum(String topic, String group, String ckInfo) {
+        String[] ckInfoList = ExtraInfoUtil.split(ckInfo);
+        long popTime = ExtraInfoUtil.getPopTime(ckInfoList);
+        if (popTime < this.brokerController.getShouldStartTime()) {
+            return;
+        }
+        decrementInFlightMessageNum(topic, group, ExtraInfoUtil.getQueueId(ckInfoList));
+    }
+
+    public void decrementInFlightMessageNum(PopCheckPoint checkPoint) {
+        if (checkPoint.getPopTime() < this.brokerController.getShouldStartTime()) {
+            return;
+        }
+        decrementInFlightMessageNum(checkPoint.getTopic(), checkPoint.getCId(), checkPoint.getQueueId());
+    }
+
+    public void decrementInFlightMessageNum(String topic, String group, int queueId) {
+        topicInFlightMessageNum.computeIfPresent(buildKey(topic, group), (key, queueNum) -> {
+            queueNum.computeIfPresent(queueId, (queueIdKey, counter) -> {
+                if (counter.decrementAndGet() <= 0) {
+                    return null;
+                }
+                return counter;
+            });
+            if (queueNum.isEmpty()) {
+                return null;
+            }
+            return queueNum;
+        });
+    }
+
+    public void clearInFlightMessageNumByGroupName(String group) {
+        Set<String> topicGroupKey = this.topicInFlightMessageNum.keySet();
+        for (String key : topicGroupKey) {
+            if (key.contains(group)) {
+                Pair<String, String> topicAndGroup = splitKey(key);
+                if (topicAndGroup != null && topicAndGroup.getObject2().equals(group)) {
+                    this.topicInFlightMessageNum.remove(key);
+                    log.info("PopInflightMessageCounter#clearInFlightMessageNumByGroupName: clean by group, topic={}, group={}",
+                        topicAndGroup.getObject1(), topicAndGroup.getObject2());
+                }
+            }
+        }
+    }
+
+    public void clearInFlightMessageNumByTopicName(String topic) {
+        Set<String> topicGroupKey = this.topicInFlightMessageNum.keySet();
+        for (String key : topicGroupKey) {
+            if (key.contains(topic)) {
+                Pair<String, String> topicAndGroup = splitKey(key);
+                if (topicAndGroup != null && topicAndGroup.getObject1().equals(topic)) {
+                    this.topicInFlightMessageNum.remove(key);
+                    log.info("PopInflightMessageCounter#clearInFlightMessageNumByTopicName: clean by topic, topic={}, group={}",
+                        topicAndGroup.getObject1(), topicAndGroup.getObject2());
+                }
+            }
+        }
+    }
+
+    public void clearInFlightMessageNum(String topic, String group, int queueId) {
+        topicInFlightMessageNum.computeIfPresent(buildKey(topic, group), (key, queueNum) -> {
+            queueNum.computeIfPresent(queueId, (queueIdKey, counter) -> null);
+            if (queueNum.isEmpty()) {
+                return null;
+            }
+            return queueNum;
+        });
+    }
+
+    public long getGroupPopInFlightMessageNum(String topic, String group, int queueId) {
+        Map<Integer /* queueId */, AtomicLong> queueCounter = topicInFlightMessageNum.get(buildKey(topic, group));
+        if (queueCounter == null) {
+            return 0;
+        }
+        AtomicLong counter = queueCounter.get(queueId);
+        if (counter == null) {
+            return 0;
+        }
+        return Math.max(0, counter.get());
+    }
+
+    private static Pair<String /* topic */, String /* group */> splitKey(String key) {
+        String[] strings = key.split(TOPIC_GROUP_SEPARATOR);
+        if (strings.length != 2) {
+            return null;
+        }
+        return new Pair<>(strings[0], strings[1]);
+    }
+
+    private static String buildKey(String topic, String group) {
+        return topic + TOPIC_GROUP_SEPARATOR + group;
+    }
+}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
index 6cd3e55fa..393631e45 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
@@ -512,6 +512,14 @@ public class PopMessageProcessor implements NettyRequestProcessor {
                 return this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId) - offset + restNum;
             }
 
+            if (isOrder) {
+                this.brokerController.getPopInflightMessageCounter().clearInFlightMessageNum(
+                    topic,
+                    requestHeader.getConsumerGroup(),
+                    queueId
+                );
+            }
+
             if (getMessageResult.getMessageMapedList().size() >= requestHeader.getMaxMsgNums()) {
                 restNum =
                     this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId) - offset + restNum;
@@ -619,6 +627,12 @@ public class PopMessageProcessor implements NettyRequestProcessor {
                     }
                 }
             }
+            this.brokerController.getPopInflightMessageCounter().incrementInFlightMessageNum(
+                topic,
+                requestHeader.getConsumerGroup(),
+                queueId,
+                getMessageTmpResult.getMessageCount()
+            );
         }
         return restNum;
     }
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
index 9363a7204..1d0d53293 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
@@ -124,6 +124,7 @@ public class PopReviveService extends ServiceThread {
             putMessageResult.getAppendMessageResult().getStatus() != AppendMessageStatus.PUT_OK) {
             throw new Exception("reviveQueueId=" + queueId + ", revive error, msg is: " + msgInner);
         }
+        this.brokerController.getPopInflightMessageCounter().decrementInFlightMessageNum(popCheckPoint);
         this.brokerController.getBrokerStatsManager().incBrokerPutNums(1);
         this.brokerController.getBrokerStatsManager().incTopicPutNums(msgInner.getTopic());
         this.brokerController.getBrokerStatsManager().incTopicPutSize(msgInner.getTopic(), putMessageResult.getAppendMessageResult().getWroteBytes());
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopInflightMessageCounterTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopInflightMessageCounterTest.java
new file mode 100644
index 000000000..3b509196b
--- /dev/null
+++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopInflightMessageCounterTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.broker.processor;
+
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.remoting.protocol.header.ExtraInfoUtil;
+import org.apache.rocketmq.store.pop.PopCheckPoint;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class PopInflightMessageCounterTest {
+
+    @Test
+    public void testNum() {
+        BrokerController brokerController = mock(BrokerController.class);
+        long brokerStartTime = System.currentTimeMillis();
+        when(brokerController.getShouldStartTime()).thenReturn(brokerStartTime);
+        PopInflightMessageCounter counter = new PopInflightMessageCounter(brokerController);
+
+        final String topic = "topic";
+        final String group = "group";
+
+        assertEquals(0, counter.getGroupPopInFlightMessageNum(topic, group, 0));
+
+        counter.incrementInFlightMessageNum(topic, group, 0, 3);
+        assertEquals(3, counter.getGroupPopInFlightMessageNum(topic, group, 0));
+
+        counter.decrementInFlightMessageNum(topic, group, ExtraInfoUtil.buildExtraInfo(0, System.currentTimeMillis(),
+            0, 0, topic, "broker", 0));
+        assertEquals(2, counter.getGroupPopInFlightMessageNum(topic, group, 0));
+
+        counter.decrementInFlightMessageNum(topic, group, ExtraInfoUtil.buildExtraInfo(0, System.currentTimeMillis() - 1000,
+            0, 0, topic, "broker", 0));
+        assertEquals(2, counter.getGroupPopInFlightMessageNum(topic, group, 0));
+
+        PopCheckPoint popCheckPoint = new PopCheckPoint();
+        popCheckPoint.setTopic(topic);
+        popCheckPoint.setCId(group);
+        popCheckPoint.setQueueId((byte) 0);
+        popCheckPoint.setPopTime(System.currentTimeMillis());
+
+        counter.decrementInFlightMessageNum(popCheckPoint);
+        assertEquals(1, counter.getGroupPopInFlightMessageNum(topic, group, 0));
+
+        counter.decrementInFlightMessageNum(topic, group, ExtraInfoUtil.buildExtraInfo(0, System.currentTimeMillis(),
+            0, 0, topic, "broker", 0));
+        assertEquals(0, counter.getGroupPopInFlightMessageNum(topic, group, 0));
+
+        counter.decrementInFlightMessageNum(topic, group, ExtraInfoUtil.buildExtraInfo(0, System.currentTimeMillis(),
+            0, 0, topic, "broker", 0));
+        assertEquals(0, counter.getGroupPopInFlightMessageNum(topic, group, 0));
+    }
+
+    @Test
+    public void testClearInFlightMessageNum() {
+        BrokerController brokerController = mock(BrokerController.class);
+        long brokerStartTime = System.currentTimeMillis();
+        when(brokerController.getShouldStartTime()).thenReturn(brokerStartTime);
+        PopInflightMessageCounter counter = new PopInflightMessageCounter(brokerController);
+
+        final String topic = "topic";
+        final String group = "group";
+
+        assertEquals(0, counter.getGroupPopInFlightMessageNum(topic, group, 0));
+
+        counter.incrementInFlightMessageNum(topic, group, 0, 3);
+        assertEquals(3, counter.getGroupPopInFlightMessageNum(topic, group, 0));
+
+        counter.clearInFlightMessageNumByTopicName("errorTopic");
+        assertEquals(3, counter.getGroupPopInFlightMessageNum(topic, group, 0));
+
+        counter.clearInFlightMessageNumByTopicName(topic);
+        assertEquals(0, counter.getGroupPopInFlightMessageNum(topic, group, 0));
+
+        counter.incrementInFlightMessageNum(topic, group, 0, 3);
+        assertEquals(3, counter.getGroupPopInFlightMessageNum(topic, group, 0));
+
+        counter.clearInFlightMessageNumByGroupName("errorGroup");
+        assertEquals(3, counter.getGroupPopInFlightMessageNum(topic, group, 0));
+
+        counter.clearInFlightMessageNumByGroupName(group);
+        assertEquals(0, counter.getGroupPopInFlightMessageNum(topic, group, 0));
+    }
+}
\ No newline at end of file