You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2022/12/07 09:22:08 UTC
[rocketmq] branch develop updated: [ISSUE #5654] support calculate inflight messages for pop (#5655)
This is an automated email from the ASF dual-hosted git repository.
lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 4c7f0ecce [ISSUE #5654] support calculate inflight messages for pop (#5655)
4c7f0ecce is described below
commit 4c7f0eccecff258c4f6df2a66198e14bc5dad727
Author: lk <xd...@outlook.com>
AuthorDate: Wed Dec 7 17:21:42 2022 +0800
[ISSUE #5654] support calculate inflight messages for pop (#5655)
---
.../apache/rocketmq/broker/BrokerController.java | 7 +
.../broker/processor/AckMessageProcessor.java | 11 ++
.../broker/processor/AdminBrokerProcessor.java | 3 +
.../processor/PopInflightMessageCounter.java | 156 +++++++++++++++++++++
.../broker/processor/PopMessageProcessor.java | 14 ++
.../broker/processor/PopReviveService.java | 1 +
.../processor/PopInflightMessageCounterTest.java | 101 +++++++++++++
7 files changed, 293 insertions(+)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 5697afce3..0a5df7cb0 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -81,6 +81,7 @@ import org.apache.rocketmq.broker.processor.EndTransactionProcessor;
import org.apache.rocketmq.broker.processor.NotificationProcessor;
import org.apache.rocketmq.broker.processor.PeekMessageProcessor;
import org.apache.rocketmq.broker.processor.PollingInfoProcessor;
+import org.apache.rocketmq.broker.processor.PopInflightMessageCounter;
import org.apache.rocketmq.broker.processor.PopMessageProcessor;
import org.apache.rocketmq.broker.processor.PullMessageProcessor;
import org.apache.rocketmq.broker.processor.QueryAssignmentProcessor;
@@ -173,6 +174,7 @@ public class BrokerController {
protected final ConsumerManager consumerManager;
protected final ConsumerFilterManager consumerFilterManager;
protected final ConsumerOrderInfoManager consumerOrderInfoManager;
+ protected final PopInflightMessageCounter popInflightMessageCounter;
protected final ProducerManager producerManager;
protected final ScheduleMessageService scheduleMessageService;
protected final ClientHousekeepingService clientHousekeepingService;
@@ -317,6 +319,7 @@ public class BrokerController {
this.producerManager = new ProducerManager(this.brokerStatsManager);
this.consumerFilterManager = new ConsumerFilterManager(this);
this.consumerOrderInfoManager = new ConsumerOrderInfoManager(this);
+ this.popInflightMessageCounter = new PopInflightMessageCounter(this);
this.clientHousekeepingService = new ClientHousekeepingService(this);
this.broker2Client = new Broker2Client(this);
this.subscriptionGroupManager = messageStoreConfig.isEnableLmq() ? new LmqSubscriptionGroupManager(this) : new SubscriptionGroupManager(this);
@@ -1177,6 +1180,10 @@ public class BrokerController {
return consumerOrderInfoManager;
}
+ public PopInflightMessageCounter getPopInflightMessageCounter() {
+ return popInflightMessageCounter;
+ }
+
public ConsumerOffsetManager getConsumerOffsetManager() {
return consumerOffsetManager;
}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
index d2886542b..80f06aed0 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
@@ -184,10 +184,12 @@ public class AckMessageProcessor implements NettyRequestProcessor {
} finally {
this.brokerController.getPopMessageProcessor().getQueueLockManager().unLock(lockKey);
}
+ decInFlightMessageNum(requestHeader);
return response;
}
if (this.brokerController.getPopMessageProcessor().getPopBufferMergeService().addAk(rqId, ackMsg)) {
+ decInFlightMessageNum(requestHeader);
return response;
}
@@ -209,7 +211,16 @@ public class AckMessageProcessor implements NettyRequestProcessor {
&& putMessageResult.getPutMessageStatus() != PutMessageStatus.SLAVE_NOT_AVAILABLE) {
POP_LOGGER.error("put ack msg error:" + putMessageResult);
}
+ decInFlightMessageNum(requestHeader);
return response;
}
+ private void decInFlightMessageNum(AckMessageRequestHeader requestHeader) {
+ this.brokerController.getPopInflightMessageCounter().decrementInFlightMessageNum(
+ requestHeader.getTopic(),
+ requestHeader.getConsumerGroup(),
+ requestHeader.getExtraInfo()
+ );
+ }
+
}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index dfbd886f2..ad86ab34a 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -522,6 +522,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
this.brokerController.getTopicConfigManager().deleteTopicConfig(requestHeader.getTopic());
this.brokerController.getTopicQueueMappingManager().delete(requestHeader.getTopic());
this.brokerController.getConsumerOffsetManager().cleanOffsetByTopic(requestHeader.getTopic());
+ this.brokerController.getPopInflightMessageCounter().clearInFlightMessageNumByTopicName(requestHeader.getTopic());
this.brokerController.getMessageStore()
.cleanUnusedTopic(this.brokerController.getTopicConfigManager().getTopicConfigTable().keySet());
if (this.brokerController.getBrokerConfig().isAutoDeleteUnusedStats()) {
@@ -1325,6 +1326,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
if (requestHeader.isCleanOffset()) {
this.brokerController.getConsumerOffsetManager().removeOffset(requestHeader.getGroupName());
+ this.brokerController.getPopInflightMessageCounter().clearInFlightMessageNumByGroupName(requestHeader.getGroupName());
}
if (this.brokerController.getBrokerConfig().isAutoDeleteUnusedStats()) {
@@ -1765,6 +1767,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
ResetOffsetBody body = new ResetOffsetBody();
String brokerName = brokerController.getBrokerConfig().getBrokerName();
for (Map.Entry<Integer, Long> entry : queueOffsetMap.entrySet()) {
+ brokerController.getPopInflightMessageCounter().clearInFlightMessageNum(topic, group, entry.getKey());
body.getOffsetTable().put(new MessageQueue(topic, brokerName, entry.getKey()), entry.getValue());
}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopInflightMessageCounter.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopInflightMessageCounter.java
new file mode 100644
index 000000000..584cc54ba
--- /dev/null
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopInflightMessageCounter.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.broker.processor;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.common.Pair;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+import org.apache.rocketmq.remoting.protocol.header.ExtraInfoUtil;
+import org.apache.rocketmq.store.pop.PopCheckPoint;
+
+public class PopInflightMessageCounter {
+ private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+
+ private static final String TOPIC_GROUP_SEPARATOR = "@";
+ private final Map<String /* topic@group */, Map<Integer /* queueId */, AtomicLong>> topicInFlightMessageNum =
+ new ConcurrentHashMap<>(512);
+ private final BrokerController brokerController;
+
+ public PopInflightMessageCounter(BrokerController brokerController) {
+ this.brokerController = brokerController;
+ }
+
+ public void incrementInFlightMessageNum(String topic, String group, int queueId, int num) {
+ if (num <= 0) {
+ return;
+ }
+ topicInFlightMessageNum.compute(buildKey(topic, group), (key, queueNum) -> {
+ if (queueNum == null) {
+ queueNum = new ConcurrentHashMap<>(8);
+ }
+ queueNum.compute(queueId, (queueIdKey, counter) -> {
+ if (counter == null) {
+ return new AtomicLong(num);
+ }
+ if (counter.addAndGet(num) <= 0) {
+ return null;
+ }
+ return counter;
+ });
+ return queueNum;
+ });
+ }
+
+ public void decrementInFlightMessageNum(String topic, String group, String ckInfo) {
+ String[] ckInfoList = ExtraInfoUtil.split(ckInfo);
+ long popTime = ExtraInfoUtil.getPopTime(ckInfoList);
+ if (popTime < this.brokerController.getShouldStartTime()) {
+ return;
+ }
+ decrementInFlightMessageNum(topic, group, ExtraInfoUtil.getQueueId(ckInfoList));
+ }
+
+ public void decrementInFlightMessageNum(PopCheckPoint checkPoint) {
+ if (checkPoint.getPopTime() < this.brokerController.getShouldStartTime()) {
+ return;
+ }
+ decrementInFlightMessageNum(checkPoint.getTopic(), checkPoint.getCId(), checkPoint.getQueueId());
+ }
+
+ public void decrementInFlightMessageNum(String topic, String group, int queueId) {
+ topicInFlightMessageNum.computeIfPresent(buildKey(topic, group), (key, queueNum) -> {
+ queueNum.computeIfPresent(queueId, (queueIdKey, counter) -> {
+ if (counter.decrementAndGet() <= 0) {
+ return null;
+ }
+ return counter;
+ });
+ if (queueNum.isEmpty()) {
+ return null;
+ }
+ return queueNum;
+ });
+ }
+
+ public void clearInFlightMessageNumByGroupName(String group) {
+ Set<String> topicGroupKey = this.topicInFlightMessageNum.keySet();
+ for (String key : topicGroupKey) {
+ if (key.contains(group)) {
+ Pair<String, String> topicAndGroup = splitKey(key);
+ if (topicAndGroup != null && topicAndGroup.getObject2().equals(group)) {
+ this.topicInFlightMessageNum.remove(key);
+ log.info("PopInflightMessageCounter#clearInFlightMessageNumByGroupName: clean by group, topic={}, group={}",
+ topicAndGroup.getObject1(), topicAndGroup.getObject2());
+ }
+ }
+ }
+ }
+
+ public void clearInFlightMessageNumByTopicName(String topic) {
+ Set<String> topicGroupKey = this.topicInFlightMessageNum.keySet();
+ for (String key : topicGroupKey) {
+ if (key.contains(topic)) {
+ Pair<String, String> topicAndGroup = splitKey(key);
+ if (topicAndGroup != null && topicAndGroup.getObject1().equals(topic)) {
+ this.topicInFlightMessageNum.remove(key);
+ log.info("PopInflightMessageCounter#clearInFlightMessageNumByTopicName: clean by topic, topic={}, group={}",
+ topicAndGroup.getObject1(), topicAndGroup.getObject2());
+ }
+ }
+ }
+ }
+
+ public void clearInFlightMessageNum(String topic, String group, int queueId) {
+ topicInFlightMessageNum.computeIfPresent(buildKey(topic, group), (key, queueNum) -> {
+ queueNum.computeIfPresent(queueId, (queueIdKey, counter) -> null);
+ if (queueNum.isEmpty()) {
+ return null;
+ }
+ return queueNum;
+ });
+ }
+
+ public long getGroupPopInFlightMessageNum(String topic, String group, int queueId) {
+ Map<Integer /* queueId */, AtomicLong> queueCounter = topicInFlightMessageNum.get(buildKey(topic, group));
+ if (queueCounter == null) {
+ return 0;
+ }
+ AtomicLong counter = queueCounter.get(queueId);
+ if (counter == null) {
+ return 0;
+ }
+ return Math.max(0, counter.get());
+ }
+
+ private static Pair<String /* topic */, String /* group */> splitKey(String key) {
+ String[] strings = key.split(TOPIC_GROUP_SEPARATOR);
+ if (strings.length != 2) {
+ return null;
+ }
+ return new Pair<>(strings[0], strings[1]);
+ }
+
+ private static String buildKey(String topic, String group) {
+ return topic + TOPIC_GROUP_SEPARATOR + group;
+ }
+}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
index 6cd3e55fa..393631e45 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
@@ -512,6 +512,14 @@ public class PopMessageProcessor implements NettyRequestProcessor {
return this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId) - offset + restNum;
}
+ if (isOrder) {
+ this.brokerController.getPopInflightMessageCounter().clearInFlightMessageNum(
+ topic,
+ requestHeader.getConsumerGroup(),
+ queueId
+ );
+ }
+
if (getMessageResult.getMessageMapedList().size() >= requestHeader.getMaxMsgNums()) {
restNum =
this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId) - offset + restNum;
@@ -619,6 +627,12 @@ public class PopMessageProcessor implements NettyRequestProcessor {
}
}
}
+ this.brokerController.getPopInflightMessageCounter().incrementInFlightMessageNum(
+ topic,
+ requestHeader.getConsumerGroup(),
+ queueId,
+ getMessageTmpResult.getMessageCount()
+ );
}
return restNum;
}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
index 9363a7204..1d0d53293 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
@@ -124,6 +124,7 @@ public class PopReviveService extends ServiceThread {
putMessageResult.getAppendMessageResult().getStatus() != AppendMessageStatus.PUT_OK) {
throw new Exception("reviveQueueId=" + queueId + ", revive error, msg is: " + msgInner);
}
+ this.brokerController.getPopInflightMessageCounter().decrementInFlightMessageNum(popCheckPoint);
this.brokerController.getBrokerStatsManager().incBrokerPutNums(1);
this.brokerController.getBrokerStatsManager().incTopicPutNums(msgInner.getTopic());
this.brokerController.getBrokerStatsManager().incTopicPutSize(msgInner.getTopic(), putMessageResult.getAppendMessageResult().getWroteBytes());
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopInflightMessageCounterTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopInflightMessageCounterTest.java
new file mode 100644
index 000000000..3b509196b
--- /dev/null
+++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopInflightMessageCounterTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.broker.processor;
+
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.remoting.protocol.header.ExtraInfoUtil;
+import org.apache.rocketmq.store.pop.PopCheckPoint;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class PopInflightMessageCounterTest {
+
+ @Test
+ public void testNum() {
+ BrokerController brokerController = mock(BrokerController.class);
+ long brokerStartTime = System.currentTimeMillis();
+ when(brokerController.getShouldStartTime()).thenReturn(brokerStartTime);
+ PopInflightMessageCounter counter = new PopInflightMessageCounter(brokerController);
+
+ final String topic = "topic";
+ final String group = "group";
+
+ assertEquals(0, counter.getGroupPopInFlightMessageNum(topic, group, 0));
+
+ counter.incrementInFlightMessageNum(topic, group, 0, 3);
+ assertEquals(3, counter.getGroupPopInFlightMessageNum(topic, group, 0));
+
+ counter.decrementInFlightMessageNum(topic, group, ExtraInfoUtil.buildExtraInfo(0, System.currentTimeMillis(),
+ 0, 0, topic, "broker", 0));
+ assertEquals(2, counter.getGroupPopInFlightMessageNum(topic, group, 0));
+
+ counter.decrementInFlightMessageNum(topic, group, ExtraInfoUtil.buildExtraInfo(0, System.currentTimeMillis() - 1000,
+ 0, 0, topic, "broker", 0));
+ assertEquals(2, counter.getGroupPopInFlightMessageNum(topic, group, 0));
+
+ PopCheckPoint popCheckPoint = new PopCheckPoint();
+ popCheckPoint.setTopic(topic);
+ popCheckPoint.setCId(group);
+ popCheckPoint.setQueueId((byte) 0);
+ popCheckPoint.setPopTime(System.currentTimeMillis());
+
+ counter.decrementInFlightMessageNum(popCheckPoint);
+ assertEquals(1, counter.getGroupPopInFlightMessageNum(topic, group, 0));
+
+ counter.decrementInFlightMessageNum(topic, group, ExtraInfoUtil.buildExtraInfo(0, System.currentTimeMillis(),
+ 0, 0, topic, "broker", 0));
+ assertEquals(0, counter.getGroupPopInFlightMessageNum(topic, group, 0));
+
+ counter.decrementInFlightMessageNum(topic, group, ExtraInfoUtil.buildExtraInfo(0, System.currentTimeMillis(),
+ 0, 0, topic, "broker", 0));
+ assertEquals(0, counter.getGroupPopInFlightMessageNum(topic, group, 0));
+ }
+
+ @Test
+ public void testClearInFlightMessageNum() {
+ BrokerController brokerController = mock(BrokerController.class);
+ long brokerStartTime = System.currentTimeMillis();
+ when(brokerController.getShouldStartTime()).thenReturn(brokerStartTime);
+ PopInflightMessageCounter counter = new PopInflightMessageCounter(brokerController);
+
+ final String topic = "topic";
+ final String group = "group";
+
+ assertEquals(0, counter.getGroupPopInFlightMessageNum(topic, group, 0));
+
+ counter.incrementInFlightMessageNum(topic, group, 0, 3);
+ assertEquals(3, counter.getGroupPopInFlightMessageNum(topic, group, 0));
+
+ counter.clearInFlightMessageNumByTopicName("errorTopic");
+ assertEquals(3, counter.getGroupPopInFlightMessageNum(topic, group, 0));
+
+ counter.clearInFlightMessageNumByTopicName(topic);
+ assertEquals(0, counter.getGroupPopInFlightMessageNum(topic, group, 0));
+
+ counter.incrementInFlightMessageNum(topic, group, 0, 3);
+ assertEquals(3, counter.getGroupPopInFlightMessageNum(topic, group, 0));
+
+ counter.clearInFlightMessageNumByGroupName("errorGroup");
+ assertEquals(3, counter.getGroupPopInFlightMessageNum(topic, group, 0));
+
+ counter.clearInFlightMessageNumByGroupName(group);
+ assertEquals(0, counter.getGroupPopInFlightMessageNum(topic, group, 0));
+ }
+}
\ No newline at end of file