You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2023/05/24 03:22:42 UTC

[rocketmq] branch develop updated: [ISSUE #6797]Support batch ack when reput buffer ak to store in PopBufferMergeService (#6798)

This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 985319ba2a [ISSUE #6797]Support batch ack when reput buffer ak to store in PopBufferMergeService (#6798)
985319ba2a is described below

commit 985319ba2abd633540133220ce2d95bb7dde0ad8
Author: Dongyuan Pan <do...@alibaba-inc.com>
AuthorDate: Wed May 24 11:22:18 2023 +0800

    [ISSUE #6797]Support batch ack when reput buffer ak to store in PopBufferMergeService (#6798)
    
    * add back for PopReviveService
    
    * add batch ack for PopReviveService
---
 .../broker/processor/PopBufferMergeService.java    | 80 ++++++++++++++++++++--
 .../broker/processor/PopMessageProcessor.java      | 10 +++
 .../broker/processor/PopReviveService.java         | 58 ++++++++++++----
 .../org/apache/rocketmq/common/BrokerConfig.java   |  9 +++
 .../apache/rocketmq/common/PopAckConstants.java    |  1 +
 .../org/apache/rocketmq/store/pop/BatchAckMsg.java | 50 ++++++++++++++
 .../apache/rocketmq/store/pop/BatchAckMsgTest.java | 57 +++++++++++++++
 7 files changed, 246 insertions(+), 19 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java
index 4d6359c1dc..c5889f5562 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java
@@ -18,7 +18,9 @@ package org.apache.rocketmq.broker.processor;
 
 import com.alibaba.fastjson.JSON;
 import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.LinkedBlockingDeque;
@@ -39,6 +41,7 @@ import org.apache.rocketmq.store.PutMessageResult;
 import org.apache.rocketmq.store.PutMessageStatus;
 import org.apache.rocketmq.store.config.BrokerRole;
 import org.apache.rocketmq.store.pop.AckMsg;
+import org.apache.rocketmq.store.pop.BatchAckMsg;
 import org.apache.rocketmq.store.pop.PopCheckPoint;
 
 public class PopBufferMergeService extends ServiceThread {
@@ -59,6 +62,7 @@ public class PopBufferMergeService extends ServiceThread {
     private final int countOfSecond1 = (int) (1000 / interval);
     private final int countOfSecond30 = (int) (30 * 1000 / interval);
 
+    private final List<Byte> batchAckIndexList = new ArrayList(32);
     private volatile boolean master = false;
 
     public PopBufferMergeService(BrokerController brokerController, PopMessageProcessor popMessageProcessor) {
@@ -268,13 +272,36 @@ public class PopBufferMergeService extends ServiceThread {
                     continue;
                 }
 
-                for (byte i = 0; i < point.getNum(); i++) {
-                    // reput buffer ak to store
-                    if (DataConverter.getBit(pointWrapper.getBits().get(), i)
-                        && !DataConverter.getBit(pointWrapper.getToStoreBits().get(), i)) {
-                        if (putAckToStore(pointWrapper, i)) {
-                            count++;
-                            markBitCAS(pointWrapper.getToStoreBits(), i);
+                if (brokerController.getBrokerConfig().isEnablePopBatchAck()) {
+                    List<Byte> indexList = this.batchAckIndexList;
+                    try {
+                        for (byte i = 0; i < point.getNum(); i++) {
+                            // reput buffer ak to store
+                            if (DataConverter.getBit(pointWrapper.getBits().get(), i)
+                                    && !DataConverter.getBit(pointWrapper.getToStoreBits().get(), i)) {
+                                indexList.add(i);
+                            }
+                        }
+                        if (indexList.size() > 0) {
+                            if (putBatchAckToStore(pointWrapper, indexList)) {
+                                count += indexList.size();
+                                for (Byte i : indexList) {
+                                    markBitCAS(pointWrapper.getToStoreBits(), i);
+                                }
+                            }
+                        }
+                    } finally {
+                        indexList.clear();
+                    }
+                } else {
+                    for (byte i = 0; i < point.getNum(); i++) {
+                        // reput buffer ak to store
+                        if (DataConverter.getBit(pointWrapper.getBits().get(), i)
+                                && !DataConverter.getBit(pointWrapper.getToStoreBits().get(), i)) {
+                            if (putAckToStore(pointWrapper, i)) {
+                                count++;
+                                markBitCAS(pointWrapper.getToStoreBits(), i);
+                            }
                         }
                     }
                 }
@@ -606,6 +633,45 @@ public class PopBufferMergeService extends ServiceThread {
         return true;
     }
 
+    private boolean putBatchAckToStore(final PopCheckPointWrapper pointWrapper, final List<Byte> msgIndexList) {
+        PopCheckPoint point = pointWrapper.getCk();
+        MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
+        final BatchAckMsg batchAckMsg = new BatchAckMsg();
+
+        for (Byte msgIndex : msgIndexList) {
+            batchAckMsg.getAckOffsetList().add(point.ackOffsetByIndex(msgIndex));
+        }
+        batchAckMsg.setStartOffset(point.getStartOffset());
+        batchAckMsg.setConsumerGroup(point.getCId());
+        batchAckMsg.setTopic(point.getTopic());
+        batchAckMsg.setQueueId(point.getQueueId());
+        batchAckMsg.setPopTime(point.getPopTime());
+        msgInner.setTopic(popMessageProcessor.reviveTopic);
+        msgInner.setBody(JSON.toJSONString(batchAckMsg).getBytes(DataConverter.charset));
+        msgInner.setQueueId(pointWrapper.getReviveQueueId());
+        msgInner.setTags(PopAckConstants.BATCH_ACK_TAG);
+        msgInner.setBornTimestamp(System.currentTimeMillis());
+        msgInner.setBornHost(brokerController.getStoreHost());
+        msgInner.setStoreHost(brokerController.getStoreHost());
+        msgInner.setDeliverTimeMs(point.getReviveTime());
+        msgInner.getProperties().put(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, PopMessageProcessor.genBatchAckUniqueId(batchAckMsg));
+
+        msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
+        PutMessageResult putMessageResult = brokerController.getEscapeBridge().putMessageToSpecificQueue(msgInner);
+        if (putMessageResult.getPutMessageStatus() != PutMessageStatus.PUT_OK
+                && putMessageResult.getPutMessageStatus() != PutMessageStatus.FLUSH_DISK_TIMEOUT
+                && putMessageResult.getPutMessageStatus() != PutMessageStatus.FLUSH_SLAVE_TIMEOUT
+                && putMessageResult.getPutMessageStatus() != PutMessageStatus.SLAVE_NOT_AVAILABLE) {
+            POP_LOGGER.error("[PopBuffer]put batch ack to store fail: {}, {}, {}", pointWrapper, batchAckMsg, putMessageResult);
+            return false;
+        }
+        if (brokerController.getBrokerConfig().isEnablePopLog()) {
+            POP_LOGGER.info("[PopBuffer]put batch ack to store ok: {}, {}, {}", pointWrapper, batchAckMsg, putMessageResult);
+        }
+
+        return true;
+    }
+
     private boolean cancelCkTimer(final PopCheckPointWrapper pointWrapper) {
         // not stored, no need cancel
         if (pointWrapper.getReviveQueueOffset() < 0) {
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
index efa07c2eff..28549bfedc 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
@@ -82,6 +82,7 @@ import org.apache.rocketmq.store.GetMessageResult;
 import org.apache.rocketmq.store.GetMessageStatus;
 import org.apache.rocketmq.store.SelectMappedBufferResult;
 import org.apache.rocketmq.store.pop.AckMsg;
+import org.apache.rocketmq.store.pop.BatchAckMsg;
 import org.apache.rocketmq.store.pop.PopCheckPoint;
 
 import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_CONSUMER_GROUP;
@@ -136,6 +137,15 @@ public class PopMessageProcessor implements NettyRequestProcessor {
             + PopAckConstants.SPLIT + PopAckConstants.ACK_TAG;
     }
 
+    public static String genBatchAckUniqueId(BatchAckMsg batchAckMsg) {
+        return batchAckMsg.getTopic()
+                + PopAckConstants.SPLIT + batchAckMsg.getQueueId()
+                + PopAckConstants.SPLIT + batchAckMsg.getAckOffsetList().toString()
+                + PopAckConstants.SPLIT + batchAckMsg.getConsumerGroup()
+                + PopAckConstants.SPLIT + batchAckMsg.getPopTime()
+                + PopAckConstants.SPLIT + PopAckConstants.BATCH_ACK_TAG;
+    }
+
     public static String genCkUniqueId(PopCheckPoint ck) {
         return ck.getTopic()
             + PopAckConstants.SPLIT + ck.getQueueId()
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
index d6ce39c290..93167db373 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
@@ -54,6 +54,7 @@ import org.apache.rocketmq.store.GetMessageResult;
 import org.apache.rocketmq.store.GetMessageStatus;
 import org.apache.rocketmq.store.PutMessageResult;
 import org.apache.rocketmq.store.pop.AckMsg;
+import org.apache.rocketmq.store.pop.BatchAckMsg;
 import org.apache.rocketmq.store.pop.PopCheckPoint;
 
 import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_CONSUMER_GROUP;
@@ -382,18 +383,8 @@ public class PopReviveService extends ServiceThread {
                         if (!brokerController.getBrokerConfig().isEnableSkipLongAwaitingAck()) {
                             continue;
                         }
-                        long ackWaitTime = System.currentTimeMillis() - messageExt.getDeliverTimeMs();
-                        long reviveAckWaitMs = brokerController.getBrokerConfig().getReviveAckWaitMs();
-                        if (ackWaitTime > reviveAckWaitMs) {
-                            // will use the reviveOffset of popCheckPoint to commit offset in mergeAndRevive
-                            PopCheckPoint mockPoint = createMockCkForAck(ackMsg, messageExt.getQueueOffset());
-                            POP_LOGGER.warn(
-                                "ack wait for {}ms cannot find ck, skip this ack. mergeKey:{}, ack:{}, mockCk:{}",
-                                reviveAckWaitMs, mergeKey, ackMsg, mockPoint);
-                            mockPointMap.put(mergeKey, mockPoint);
-                            if (firstRt == 0) {
-                                firstRt = mockPoint.getReviveTime();
-                            }
+                        if (mockCkForAck(messageExt, ackMsg, mergeKey, mockPointMap) && firstRt == 0) {
+                            firstRt = mockPointMap.get(mergeKey).getReviveTime();
                         }
                     } else {
                         int indexOfAck = point.indexOfAck(ackMsg.getAckOffset());
@@ -403,6 +394,34 @@ public class PopReviveService extends ServiceThread {
                             POP_LOGGER.error("invalid ack index, {}, {}", ackMsg, point);
                         }
                     }
+                } else if (PopAckConstants.BATCH_ACK_TAG.equals(messageExt.getTags())) {
+                    String raw = new String(messageExt.getBody(), DataConverter.charset);
+                    if (brokerController.getBrokerConfig().isEnablePopLog()) {
+                        POP_LOGGER.info("reviveQueueId={}, find batch ack, offset:{}, raw : {}", messageExt.getQueueId(), messageExt.getQueueOffset(), raw);
+                    }
+
+                    BatchAckMsg bAckMsg = JSON.parseObject(raw, BatchAckMsg.class);
+                    PopMetricsManager.incPopReviveAckGetCount(bAckMsg, queueId);
+                    String mergeKey = bAckMsg.getTopic() + bAckMsg.getConsumerGroup() + bAckMsg.getQueueId() + bAckMsg.getStartOffset() + bAckMsg.getPopTime();
+                    PopCheckPoint point = map.get(mergeKey);
+                    if (point == null) {
+                        if (!brokerController.getBrokerConfig().isEnableSkipLongAwaitingAck()) {
+                            continue;
+                        }
+                        if (mockCkForAck(messageExt, bAckMsg, mergeKey, mockPointMap) && firstRt == 0) {
+                            firstRt = mockPointMap.get(mergeKey).getReviveTime();
+                        }
+                    } else {
+                        List<Long> ackOffsetList = bAckMsg.getAckOffsetList();
+                        for (Long ackOffset : ackOffsetList) {
+                            int indexOfAck = point.indexOfAck(ackOffset);
+                            if (indexOfAck > -1) {
+                                point.setBitMap(DataConverter.setBit(point.getBitMap(), indexOfAck, true));
+                            } else {
+                                POP_LOGGER.error("invalid batch ack index, {}, {}", bAckMsg, point);
+                            }
+                        }
+                    }
                 }
                 long deliverTime = messageExt.getDeliverTimeMs();
                 if (deliverTime > endTime) {
@@ -415,6 +434,21 @@ public class PopReviveService extends ServiceThread {
         consumeReviveObj.endTime = endTime;
     }
 
+    private boolean mockCkForAck(MessageExt messageExt, AckMsg ackMsg, String mergeKey, HashMap<String, PopCheckPoint> mockPointMap) {
+        long ackWaitTime = System.currentTimeMillis() - messageExt.getDeliverTimeMs();
+        long reviveAckWaitMs = brokerController.getBrokerConfig().getReviveAckWaitMs();
+        if (ackWaitTime > reviveAckWaitMs) {
+            // will use the reviveOffset of popCheckPoint to commit offset in mergeAndRevive
+            PopCheckPoint mockPoint = createMockCkForAck(ackMsg, messageExt.getQueueOffset());
+            POP_LOGGER.warn(
+                    "ack wait for {}ms cannot find ck, skip this ack. mergeKey:{}, ack:{}, mockCk:{}",
+                    reviveAckWaitMs, mergeKey, ackMsg, mockPoint);
+            mockPointMap.put(mergeKey, mockPoint);
+            return true;
+        }
+        return false;
+    }
+
     private PopCheckPoint createMockCkForAck(AckMsg ackMsg, long reviveOffset) {
         PopCheckPoint point = new PopCheckPoint();
         point.setStartOffset(ackMsg.getStartOffset());
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index 07640232fb..2ce63a1f49 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -218,6 +218,7 @@ public class BrokerConfig extends BrokerIdentity {
     private int popCkStayBufferTimeOut = 3 * 1000;
     private int popCkMaxBufferSize = 200000;
     private int popCkOffsetMaxQueueSize = 20000;
+    private boolean enablePopBatchAck = false;
     private boolean enableNotifyAfterPopOrderLockRelease = true;
 
     private boolean realTimeNotifyConsumerChange = true;
@@ -499,6 +500,14 @@ public class BrokerConfig extends BrokerIdentity {
         this.popCkOffsetMaxQueueSize = popCkOffsetMaxQueueSize;
     }
 
+    public boolean isEnablePopBatchAck() {
+        return enablePopBatchAck;
+    }
+
+    public void setEnablePopBatchAck(boolean enablePopBatchAck) {
+        this.enablePopBatchAck = enablePopBatchAck;
+    }
+
     public boolean isEnableSkipLongAwaitingAck() {
         return enableSkipLongAwaitingAck;
     }
diff --git a/common/src/main/java/org/apache/rocketmq/common/PopAckConstants.java b/common/src/main/java/org/apache/rocketmq/common/PopAckConstants.java
index ac5a1a17ed..17bc61578f 100644
--- a/common/src/main/java/org/apache/rocketmq/common/PopAckConstants.java
+++ b/common/src/main/java/org/apache/rocketmq/common/PopAckConstants.java
@@ -30,6 +30,7 @@ public class PopAckConstants {
     public static final String REVIVE_TOPIC = TopicValidator.SYSTEM_TOPIC_PREFIX + "REVIVE_LOG_";
     public static final String CK_TAG = "ck";
     public static final String ACK_TAG = "ack";
+    public static final String BATCH_ACK_TAG = "bAck";
     public static final String SPLIT = "@";
 
     /**
diff --git a/store/src/main/java/org/apache/rocketmq/store/pop/BatchAckMsg.java b/store/src/main/java/org/apache/rocketmq/store/pop/BatchAckMsg.java
new file mode 100644
index 0000000000..991a1f085d
--- /dev/null
+++ b/store/src/main/java/org/apache/rocketmq/store/pop/BatchAckMsg.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store.pop;
+
+import com.alibaba.fastjson.annotation.JSONField;
+import java.util.ArrayList;
+import java.util.List;
+
+
+public class BatchAckMsg extends AckMsg {
+    @JSONField(name = "aol", alternateNames = {"ackOffsetList"})
+    private List<Long> ackOffsetList = new ArrayList(32);
+
+
+    public List<Long> getAckOffsetList() {
+        return ackOffsetList;
+    }
+
+    public void setAckOffsetList(List<Long> ackOffsetList) {
+        this.ackOffsetList = ackOffsetList;
+    }
+
+    @Override
+    public String toString() {
+        final StringBuilder sb = new StringBuilder("BatchAckMsg{");
+        sb.append("ackOffsetList=").append(ackOffsetList);
+        sb.append(", startOffset=").append(getStartOffset());
+        sb.append(", consumerGroup='").append(getConsumerGroup()).append('\'');
+        sb.append(", topic='").append(getTopic()).append('\'');
+        sb.append(", queueId=").append(getQueueId());
+        sb.append(", popTime=").append(getPopTime());
+        sb.append(", brokerName=").append(getBrokerName());
+        sb.append('}');
+        return sb.toString();
+    }
+}
diff --git a/store/src/test/java/org/apache/rocketmq/store/pop/BatchAckMsgTest.java b/store/src/test/java/org/apache/rocketmq/store/pop/BatchAckMsgTest.java
new file mode 100644
index 0000000000..4bcfcf18be
--- /dev/null
+++ b/store/src/test/java/org/apache/rocketmq/store/pop/BatchAckMsgTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.store.pop;
+
+import com.alibaba.fastjson.JSON;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class BatchAckMsgTest {
+
+    @Test
+    public void testSerializeAndDeSerialize() {
+        String longString = "{\"ackOffsetList\":[100, 101],\"consumerGroup\":\"group\"," +
+                "\"popTime\":1679454922000,\"queueId\":3,\"startOffset\":200,\"topic\":\"topic\"}";
+
+        BatchAckMsg batchAckMsg = new BatchAckMsg();
+        List<Long> aol = new ArrayList<>(32);
+        aol.add(100L);
+        aol.add(101L);
+
+        batchAckMsg.setAckOffsetList(aol);
+        batchAckMsg.setStartOffset(200L);
+        batchAckMsg.setConsumerGroup("group");
+        batchAckMsg.setTopic("topic");
+        batchAckMsg.setQueueId(3);
+        batchAckMsg.setPopTime(1679454922000L);
+
+        String jsonString = JSON.toJSONString(batchAckMsg);
+        BatchAckMsg batchAckMsg1 = JSON.parseObject(jsonString, BatchAckMsg.class);
+        BatchAckMsg batchAckMsg2 = JSON.parseObject(longString, BatchAckMsg.class);
+
+        Assert.assertEquals(batchAckMsg1.getAckOffsetList(), batchAckMsg2.getAckOffsetList());
+        Assert.assertEquals(batchAckMsg1.getTopic(), batchAckMsg2.getTopic());
+        Assert.assertEquals(batchAckMsg1.getConsumerGroup(), batchAckMsg2.getConsumerGroup());
+        Assert.assertEquals(batchAckMsg1.getQueueId(), batchAckMsg2.getQueueId());
+        Assert.assertEquals(batchAckMsg1.getStartOffset(), batchAckMsg2.getStartOffset());
+        Assert.assertEquals(batchAckMsg1.getPopTime(), batchAckMsg2.getPopTime());
+    }
+}