You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2023/05/24 03:22:42 UTC
[rocketmq] branch develop updated: [ISSUE #6797]Support batch ack when reput buffer ak to store in PopBufferMergeService (#6798)
This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 985319ba2a [ISSUE #6797]Support batch ack when reput buffer ak to store in PopBufferMergeService (#6798)
985319ba2a is described below
commit 985319ba2abd633540133220ce2d95bb7dde0ad8
Author: Dongyuan Pan <do...@alibaba-inc.com>
AuthorDate: Wed May 24 11:22:18 2023 +0800
[ISSUE #6797]Support batch ack when reput buffer ak to store in PopBufferMergeService (#6798)
* add back for PopReviveService
* add batch ack for PopReviveService
---
.../broker/processor/PopBufferMergeService.java | 80 ++++++++++++++++++++--
.../broker/processor/PopMessageProcessor.java | 10 +++
.../broker/processor/PopReviveService.java | 58 ++++++++++++----
.../org/apache/rocketmq/common/BrokerConfig.java | 9 +++
.../apache/rocketmq/common/PopAckConstants.java | 1 +
.../org/apache/rocketmq/store/pop/BatchAckMsg.java | 50 ++++++++++++++
.../apache/rocketmq/store/pop/BatchAckMsgTest.java | 57 +++++++++++++++
7 files changed, 246 insertions(+), 19 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java
index 4d6359c1dc..c5889f5562 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java
@@ -18,7 +18,9 @@ package org.apache.rocketmq.broker.processor;
import com.alibaba.fastjson.JSON;
import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingDeque;
@@ -39,6 +41,7 @@ import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.PutMessageStatus;
import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.pop.AckMsg;
+import org.apache.rocketmq.store.pop.BatchAckMsg;
import org.apache.rocketmq.store.pop.PopCheckPoint;
public class PopBufferMergeService extends ServiceThread {
@@ -59,6 +62,7 @@ public class PopBufferMergeService extends ServiceThread {
private final int countOfSecond1 = (int) (1000 / interval);
private final int countOfSecond30 = (int) (30 * 1000 / interval);
+ private final List<Byte> batchAckIndexList = new ArrayList(32);
private volatile boolean master = false;
public PopBufferMergeService(BrokerController brokerController, PopMessageProcessor popMessageProcessor) {
@@ -268,13 +272,36 @@ public class PopBufferMergeService extends ServiceThread {
continue;
}
- for (byte i = 0; i < point.getNum(); i++) {
- // reput buffer ak to store
- if (DataConverter.getBit(pointWrapper.getBits().get(), i)
- && !DataConverter.getBit(pointWrapper.getToStoreBits().get(), i)) {
- if (putAckToStore(pointWrapper, i)) {
- count++;
- markBitCAS(pointWrapper.getToStoreBits(), i);
+ if (brokerController.getBrokerConfig().isEnablePopBatchAck()) {
+ List<Byte> indexList = this.batchAckIndexList;
+ try {
+ for (byte i = 0; i < point.getNum(); i++) {
+ // reput buffer ak to store
+ if (DataConverter.getBit(pointWrapper.getBits().get(), i)
+ && !DataConverter.getBit(pointWrapper.getToStoreBits().get(), i)) {
+ indexList.add(i);
+ }
+ }
+ if (indexList.size() > 0) {
+ if (putBatchAckToStore(pointWrapper, indexList)) {
+ count += indexList.size();
+ for (Byte i : indexList) {
+ markBitCAS(pointWrapper.getToStoreBits(), i);
+ }
+ }
+ }
+ } finally {
+ indexList.clear();
+ }
+ } else {
+ for (byte i = 0; i < point.getNum(); i++) {
+ // reput buffer ak to store
+ if (DataConverter.getBit(pointWrapper.getBits().get(), i)
+ && !DataConverter.getBit(pointWrapper.getToStoreBits().get(), i)) {
+ if (putAckToStore(pointWrapper, i)) {
+ count++;
+ markBitCAS(pointWrapper.getToStoreBits(), i);
+ }
}
}
}
@@ -606,6 +633,45 @@ public class PopBufferMergeService extends ServiceThread {
return true;
}
+ private boolean putBatchAckToStore(final PopCheckPointWrapper pointWrapper, final List<Byte> msgIndexList) {
+ PopCheckPoint point = pointWrapper.getCk();
+ MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
+ final BatchAckMsg batchAckMsg = new BatchAckMsg();
+
+ for (Byte msgIndex : msgIndexList) {
+ batchAckMsg.getAckOffsetList().add(point.ackOffsetByIndex(msgIndex));
+ }
+ batchAckMsg.setStartOffset(point.getStartOffset());
+ batchAckMsg.setConsumerGroup(point.getCId());
+ batchAckMsg.setTopic(point.getTopic());
+ batchAckMsg.setQueueId(point.getQueueId());
+ batchAckMsg.setPopTime(point.getPopTime());
+ msgInner.setTopic(popMessageProcessor.reviveTopic);
+ msgInner.setBody(JSON.toJSONString(batchAckMsg).getBytes(DataConverter.charset));
+ msgInner.setQueueId(pointWrapper.getReviveQueueId());
+ msgInner.setTags(PopAckConstants.BATCH_ACK_TAG);
+ msgInner.setBornTimestamp(System.currentTimeMillis());
+ msgInner.setBornHost(brokerController.getStoreHost());
+ msgInner.setStoreHost(brokerController.getStoreHost());
+ msgInner.setDeliverTimeMs(point.getReviveTime());
+ msgInner.getProperties().put(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, PopMessageProcessor.genBatchAckUniqueId(batchAckMsg));
+
+ msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
+ PutMessageResult putMessageResult = brokerController.getEscapeBridge().putMessageToSpecificQueue(msgInner);
+ if (putMessageResult.getPutMessageStatus() != PutMessageStatus.PUT_OK
+ && putMessageResult.getPutMessageStatus() != PutMessageStatus.FLUSH_DISK_TIMEOUT
+ && putMessageResult.getPutMessageStatus() != PutMessageStatus.FLUSH_SLAVE_TIMEOUT
+ && putMessageResult.getPutMessageStatus() != PutMessageStatus.SLAVE_NOT_AVAILABLE) {
+ POP_LOGGER.error("[PopBuffer]put batch ack to store fail: {}, {}, {}", pointWrapper, batchAckMsg, putMessageResult);
+ return false;
+ }
+ if (brokerController.getBrokerConfig().isEnablePopLog()) {
+ POP_LOGGER.info("[PopBuffer]put batch ack to store ok: {}, {}, {}", pointWrapper, batchAckMsg, putMessageResult);
+ }
+
+ return true;
+ }
+
private boolean cancelCkTimer(final PopCheckPointWrapper pointWrapper) {
// not stored, no need cancel
if (pointWrapper.getReviveQueueOffset() < 0) {
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
index efa07c2eff..28549bfedc 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
@@ -82,6 +82,7 @@ import org.apache.rocketmq.store.GetMessageResult;
import org.apache.rocketmq.store.GetMessageStatus;
import org.apache.rocketmq.store.SelectMappedBufferResult;
import org.apache.rocketmq.store.pop.AckMsg;
+import org.apache.rocketmq.store.pop.BatchAckMsg;
import org.apache.rocketmq.store.pop.PopCheckPoint;
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_CONSUMER_GROUP;
@@ -136,6 +137,15 @@ public class PopMessageProcessor implements NettyRequestProcessor {
+ PopAckConstants.SPLIT + PopAckConstants.ACK_TAG;
}
+ public static String genBatchAckUniqueId(BatchAckMsg batchAckMsg) {
+ return batchAckMsg.getTopic()
+ + PopAckConstants.SPLIT + batchAckMsg.getQueueId()
+ + PopAckConstants.SPLIT + batchAckMsg.getAckOffsetList().toString()
+ + PopAckConstants.SPLIT + batchAckMsg.getConsumerGroup()
+ + PopAckConstants.SPLIT + batchAckMsg.getPopTime()
+ + PopAckConstants.SPLIT + PopAckConstants.BATCH_ACK_TAG;
+ }
+
public static String genCkUniqueId(PopCheckPoint ck) {
return ck.getTopic()
+ PopAckConstants.SPLIT + ck.getQueueId()
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
index d6ce39c290..93167db373 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
@@ -54,6 +54,7 @@ import org.apache.rocketmq.store.GetMessageResult;
import org.apache.rocketmq.store.GetMessageStatus;
import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.pop.AckMsg;
+import org.apache.rocketmq.store.pop.BatchAckMsg;
import org.apache.rocketmq.store.pop.PopCheckPoint;
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_CONSUMER_GROUP;
@@ -382,18 +383,8 @@ public class PopReviveService extends ServiceThread {
if (!brokerController.getBrokerConfig().isEnableSkipLongAwaitingAck()) {
continue;
}
- long ackWaitTime = System.currentTimeMillis() - messageExt.getDeliverTimeMs();
- long reviveAckWaitMs = brokerController.getBrokerConfig().getReviveAckWaitMs();
- if (ackWaitTime > reviveAckWaitMs) {
- // will use the reviveOffset of popCheckPoint to commit offset in mergeAndRevive
- PopCheckPoint mockPoint = createMockCkForAck(ackMsg, messageExt.getQueueOffset());
- POP_LOGGER.warn(
- "ack wait for {}ms cannot find ck, skip this ack. mergeKey:{}, ack:{}, mockCk:{}",
- reviveAckWaitMs, mergeKey, ackMsg, mockPoint);
- mockPointMap.put(mergeKey, mockPoint);
- if (firstRt == 0) {
- firstRt = mockPoint.getReviveTime();
- }
+ if (mockCkForAck(messageExt, ackMsg, mergeKey, mockPointMap) && firstRt == 0) {
+ firstRt = mockPointMap.get(mergeKey).getReviveTime();
}
} else {
int indexOfAck = point.indexOfAck(ackMsg.getAckOffset());
@@ -403,6 +394,34 @@ public class PopReviveService extends ServiceThread {
POP_LOGGER.error("invalid ack index, {}, {}", ackMsg, point);
}
}
+ } else if (PopAckConstants.BATCH_ACK_TAG.equals(messageExt.getTags())) {
+ String raw = new String(messageExt.getBody(), DataConverter.charset);
+ if (brokerController.getBrokerConfig().isEnablePopLog()) {
+ POP_LOGGER.info("reviveQueueId={}, find batch ack, offset:{}, raw : {}", messageExt.getQueueId(), messageExt.getQueueOffset(), raw);
+ }
+
+ BatchAckMsg bAckMsg = JSON.parseObject(raw, BatchAckMsg.class);
+ PopMetricsManager.incPopReviveAckGetCount(bAckMsg, queueId);
+ String mergeKey = bAckMsg.getTopic() + bAckMsg.getConsumerGroup() + bAckMsg.getQueueId() + bAckMsg.getStartOffset() + bAckMsg.getPopTime();
+ PopCheckPoint point = map.get(mergeKey);
+ if (point == null) {
+ if (!brokerController.getBrokerConfig().isEnableSkipLongAwaitingAck()) {
+ continue;
+ }
+ if (mockCkForAck(messageExt, bAckMsg, mergeKey, mockPointMap) && firstRt == 0) {
+ firstRt = mockPointMap.get(mergeKey).getReviveTime();
+ }
+ } else {
+ List<Long> ackOffsetList = bAckMsg.getAckOffsetList();
+ for (Long ackOffset : ackOffsetList) {
+ int indexOfAck = point.indexOfAck(ackOffset);
+ if (indexOfAck > -1) {
+ point.setBitMap(DataConverter.setBit(point.getBitMap(), indexOfAck, true));
+ } else {
+ POP_LOGGER.error("invalid batch ack index, {}, {}", bAckMsg, point);
+ }
+ }
+ }
}
long deliverTime = messageExt.getDeliverTimeMs();
if (deliverTime > endTime) {
@@ -415,6 +434,21 @@ public class PopReviveService extends ServiceThread {
consumeReviveObj.endTime = endTime;
}
+ private boolean mockCkForAck(MessageExt messageExt, AckMsg ackMsg, String mergeKey, HashMap<String, PopCheckPoint> mockPointMap) {
+ long ackWaitTime = System.currentTimeMillis() - messageExt.getDeliverTimeMs();
+ long reviveAckWaitMs = brokerController.getBrokerConfig().getReviveAckWaitMs();
+ if (ackWaitTime > reviveAckWaitMs) {
+ // will use the reviveOffset of popCheckPoint to commit offset in mergeAndRevive
+ PopCheckPoint mockPoint = createMockCkForAck(ackMsg, messageExt.getQueueOffset());
+ POP_LOGGER.warn(
+ "ack wait for {}ms cannot find ck, skip this ack. mergeKey:{}, ack:{}, mockCk:{}",
+ reviveAckWaitMs, mergeKey, ackMsg, mockPoint);
+ mockPointMap.put(mergeKey, mockPoint);
+ return true;
+ }
+ return false;
+ }
+
private PopCheckPoint createMockCkForAck(AckMsg ackMsg, long reviveOffset) {
PopCheckPoint point = new PopCheckPoint();
point.setStartOffset(ackMsg.getStartOffset());
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index 07640232fb..2ce63a1f49 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -218,6 +218,7 @@ public class BrokerConfig extends BrokerIdentity {
private int popCkStayBufferTimeOut = 3 * 1000;
private int popCkMaxBufferSize = 200000;
private int popCkOffsetMaxQueueSize = 20000;
+ private boolean enablePopBatchAck = false;
private boolean enableNotifyAfterPopOrderLockRelease = true;
private boolean realTimeNotifyConsumerChange = true;
@@ -499,6 +500,14 @@ public class BrokerConfig extends BrokerIdentity {
this.popCkOffsetMaxQueueSize = popCkOffsetMaxQueueSize;
}
+ public boolean isEnablePopBatchAck() {
+ return enablePopBatchAck;
+ }
+
+ public void setEnablePopBatchAck(boolean enablePopBatchAck) {
+ this.enablePopBatchAck = enablePopBatchAck;
+ }
+
public boolean isEnableSkipLongAwaitingAck() {
return enableSkipLongAwaitingAck;
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/PopAckConstants.java b/common/src/main/java/org/apache/rocketmq/common/PopAckConstants.java
index ac5a1a17ed..17bc61578f 100644
--- a/common/src/main/java/org/apache/rocketmq/common/PopAckConstants.java
+++ b/common/src/main/java/org/apache/rocketmq/common/PopAckConstants.java
@@ -30,6 +30,7 @@ public class PopAckConstants {
public static final String REVIVE_TOPIC = TopicValidator.SYSTEM_TOPIC_PREFIX + "REVIVE_LOG_";
public static final String CK_TAG = "ck";
public static final String ACK_TAG = "ack";
+ public static final String BATCH_ACK_TAG = "bAck";
public static final String SPLIT = "@";
/**
diff --git a/store/src/main/java/org/apache/rocketmq/store/pop/BatchAckMsg.java b/store/src/main/java/org/apache/rocketmq/store/pop/BatchAckMsg.java
new file mode 100644
index 0000000000..991a1f085d
--- /dev/null
+++ b/store/src/main/java/org/apache/rocketmq/store/pop/BatchAckMsg.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store.pop;
+
+import com.alibaba.fastjson.annotation.JSONField;
+import java.util.ArrayList;
+import java.util.List;
+
+
+public class BatchAckMsg extends AckMsg {
+ @JSONField(name = "aol", alternateNames = {"ackOffsetList"})
+ private List<Long> ackOffsetList = new ArrayList(32);
+
+
+ public List<Long> getAckOffsetList() {
+ return ackOffsetList;
+ }
+
+ public void setAckOffsetList(List<Long> ackOffsetList) {
+ this.ackOffsetList = ackOffsetList;
+ }
+
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder("BatchAckMsg{");
+ sb.append("ackOffsetList=").append(ackOffsetList);
+ sb.append(", startOffset=").append(getStartOffset());
+ sb.append(", consumerGroup='").append(getConsumerGroup()).append('\'');
+ sb.append(", topic='").append(getTopic()).append('\'');
+ sb.append(", queueId=").append(getQueueId());
+ sb.append(", popTime=").append(getPopTime());
+ sb.append(", brokerName=").append(getBrokerName());
+ sb.append('}');
+ return sb.toString();
+ }
+}
diff --git a/store/src/test/java/org/apache/rocketmq/store/pop/BatchAckMsgTest.java b/store/src/test/java/org/apache/rocketmq/store/pop/BatchAckMsgTest.java
new file mode 100644
index 0000000000..4bcfcf18be
--- /dev/null
+++ b/store/src/test/java/org/apache/rocketmq/store/pop/BatchAckMsgTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.store.pop;
+
+import com.alibaba.fastjson.JSON;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class BatchAckMsgTest {
+
+ @Test
+ public void testSerializeAndDeSerialize() {
+ String longString = "{\"ackOffsetList\":[100, 101],\"consumerGroup\":\"group\"," +
+ "\"popTime\":1679454922000,\"queueId\":3,\"startOffset\":200,\"topic\":\"topic\"}";
+
+ BatchAckMsg batchAckMsg = new BatchAckMsg();
+ List<Long> aol = new ArrayList<>(32);
+ aol.add(100L);
+ aol.add(101L);
+
+ batchAckMsg.setAckOffsetList(aol);
+ batchAckMsg.setStartOffset(200L);
+ batchAckMsg.setConsumerGroup("group");
+ batchAckMsg.setTopic("topic");
+ batchAckMsg.setQueueId(3);
+ batchAckMsg.setPopTime(1679454922000L);
+
+ String jsonString = JSON.toJSONString(batchAckMsg);
+ BatchAckMsg batchAckMsg1 = JSON.parseObject(jsonString, BatchAckMsg.class);
+ BatchAckMsg batchAckMsg2 = JSON.parseObject(longString, BatchAckMsg.class);
+
+ Assert.assertEquals(batchAckMsg1.getAckOffsetList(), batchAckMsg2.getAckOffsetList());
+ Assert.assertEquals(batchAckMsg1.getTopic(), batchAckMsg2.getTopic());
+ Assert.assertEquals(batchAckMsg1.getConsumerGroup(), batchAckMsg2.getConsumerGroup());
+ Assert.assertEquals(batchAckMsg1.getQueueId(), batchAckMsg2.getQueueId());
+ Assert.assertEquals(batchAckMsg1.getStartOffset(), batchAckMsg2.getStartOffset());
+ Assert.assertEquals(batchAckMsg1.getPopTime(), batchAckMsg2.getPopTime());
+ }
+}