You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2021/03/29 02:28:52 UTC

[rocketmq] branch pop_consumer updated: [RIP-19] Pop Consuming (store)

This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch pop_consumer
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/pop_consumer by this push:
     new c29d5fd  [RIP-19] Pop Consuming (store)
     new ea36854  Merge pull request #2733 from ayanamist/pop_consumer
c29d5fd is described below

commit c29d5fd74e2b29cd84f141d278f193a58c2e3d24
Author: ayanamist <ay...@gmail.com>
AuthorDate: Tue Mar 9 11:09:10 2021 +0800

    [RIP-19] Pop Consuming (store)
---
 .../apache/rocketmq/store/DefaultMessageStore.java |   3 +-
 .../apache/rocketmq/store/GetMessageResult.java    |  10 ++
 .../org/apache/rocketmq/store/MessageStore.java    |   4 +-
 .../java/org/apache/rocketmq/store/pop/AckMsg.java |  87 +++++++++++
 .../apache/rocketmq/store/pop/PopCheckPoint.java   | 163 +++++++++++++++++++++
 .../store/schedule/ScheduleMessageService.java     |  20 ++-
 6 files changed, 282 insertions(+), 5 deletions(-)

diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index 14a8848..bb92cc0 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -672,7 +672,7 @@ public class DefaultMessageStore implements MessageStore {
                             }
 
                             this.storeStatsService.getGetMessageTransferedMsgCount().incrementAndGet();
-                            getResult.addMessage(selectResult);
+                            getResult.addMessage(selectResult, offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE));
                             status = GetMessageStatus.FOUND;
                             nextPhyFileStartOffset = Long.MIN_VALUE;
                         }
@@ -1496,6 +1496,7 @@ public class DefaultMessageStore implements MessageStore {
         return haService;
     }
 
+    @Override
     public ScheduleMessageService getScheduleMessageService() {
         return scheduleMessageService;
     }
diff --git a/store/src/main/java/org/apache/rocketmq/store/GetMessageResult.java b/store/src/main/java/org/apache/rocketmq/store/GetMessageResult.java
index 996e24d..6fcb310 100644
--- a/store/src/main/java/org/apache/rocketmq/store/GetMessageResult.java
+++ b/store/src/main/java/org/apache/rocketmq/store/GetMessageResult.java
@@ -27,6 +27,7 @@ public class GetMessageResult {
         new ArrayList<SelectMappedBufferResult>(100);
 
     private final List<ByteBuffer> messageBufferList = new ArrayList<ByteBuffer>(100);
+    private final List<Long> messageQueueOffset = new ArrayList<>(100);
 
     private GetMessageStatus status;
     private long nextBeginOffset;
@@ -90,6 +91,11 @@ public class GetMessageResult {
             mapedBuffer.getSize() / BrokerStatsManager.SIZE_PER_COUNT);
     }
 
+    public void addMessage(final SelectMappedBufferResult mapedBuffer, final long queueOffset) {
+        addMessage(mapedBuffer);
+        this.messageQueueOffset.add(queueOffset);
+    }
+
     public void release() {
         for (SelectMappedBufferResult select : this.messageMapedList) {
             select.release();
@@ -124,6 +130,10 @@ public class GetMessageResult {
         this.msgCount4Commercial = msgCount4Commercial;
     }
 
+    public List<Long> getMessageQueueOffset() {
+        return messageQueueOffset;
+    }
+
     @Override
     public String toString() {
         return "GetMessageResult [status=" + status + ", nextBeginOffset=" + nextBeginOffset + ", minOffset="
diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
index 64eb525..0cea607 100644
--- a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
@@ -20,10 +20,10 @@ import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
-
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageExtBatch;
 import org.apache.rocketmq.store.config.BrokerRole;
+import org.apache.rocketmq.store.schedule.ScheduleMessageService;
 import org.apache.rocketmq.store.stats.BrokerStatsManager;
 
 /**
@@ -383,6 +383,8 @@ public interface MessageStore {
      */
     ConsumeQueue getConsumeQueue(String topic, int queueId);
 
+    ScheduleMessageService getScheduleMessageService();
+
     /**
      * Get BrokerStatsManager of the messageStore.
      *
diff --git a/store/src/main/java/org/apache/rocketmq/store/pop/AckMsg.java b/store/src/main/java/org/apache/rocketmq/store/pop/AckMsg.java
new file mode 100644
index 0000000..ab017a9
--- /dev/null
+++ b/store/src/main/java/org/apache/rocketmq/store/pop/AckMsg.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store.pop;
+
+public class AckMsg {
+    private long ackOffset;
+    private long startOffset;
+    private String consumerGroup;
+    private String topic;
+    private int queueId;
+    private long popTime;
+
+    public long getPopTime() {
+        return popTime;
+    }
+
+    public void setPopTime(long popTime) {
+        this.popTime = popTime;
+    }
+
+    public void setQueueId(int queueId) {
+        this.queueId = queueId;
+    }
+
+    public int getQueueId() {
+        return queueId;
+    }
+
+    public void setTopic(String topic) {
+        this.topic = topic;
+    }
+
+    public String getTopic() {
+        return topic;
+    }
+
+    public long getAckOffset() {
+        return ackOffset;
+    }
+
+    public String getConsumerGroup() {
+        return consumerGroup;
+    }
+
+    public void setConsumerGroup(String consumerGroup) {
+        this.consumerGroup = consumerGroup;
+    }
+
+    public void setAckOffset(long ackOffset) {
+        this.ackOffset = ackOffset;
+    }
+
+    public long getStartOffset() {
+        return startOffset;
+    }
+
+    public void setStartOffset(long startOffset) {
+        this.startOffset = startOffset;
+    }
+
+    @Override
+    public String toString() {
+        final StringBuilder sb = new StringBuilder("AckMsg{");
+        sb.append("ackOffset=").append(ackOffset);
+        sb.append(", startOffset=").append(startOffset);
+        sb.append(", consumerGroup='").append(consumerGroup).append('\'');
+        sb.append(", topic='").append(topic).append('\'');
+        sb.append(", queueId=").append(queueId);
+        sb.append(", popTime=").append(popTime);
+        sb.append('}');
+        return sb.toString();
+    }
+}
diff --git a/store/src/main/java/org/apache/rocketmq/store/pop/PopCheckPoint.java b/store/src/main/java/org/apache/rocketmq/store/pop/PopCheckPoint.java
new file mode 100644
index 0000000..f5b7ac6
--- /dev/null
+++ b/store/src/main/java/org/apache/rocketmq/store/pop/PopCheckPoint.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store.pop;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class PopCheckPoint {
+    private long startOffset;
+    private long popTime;
+    private long invisibleTime;
+    private int bitMap;
+    private byte num;
+    private byte queueId;
+    private String topic;
+    private String cid;
+    private long reviveOffset;
+    private List<Integer> queueOffsetDiff;
+
+    public long getReviveOffset() {
+        return reviveOffset;
+    }
+
+    public void setReviveOffset(long reviveOffset) {
+        this.reviveOffset = reviveOffset;
+    }
+
+    public long getStartOffset() {
+        return startOffset;
+    }
+
+    public void setStartOffset(long startOffset) {
+        this.startOffset = startOffset;
+    }
+
+    public void getStartOffset(long startOffset) {
+        this.startOffset = startOffset;
+    }
+
+    public void setPopTime(long popTime) {
+        this.popTime = popTime;
+    }
+
+    public void setInvisibleTime(long invisibleTime) {
+        this.invisibleTime = invisibleTime;
+    }
+
+    public long getPopTime() {
+        return popTime;
+    }
+
+    public long getInvisibleTime() {
+        return invisibleTime;
+    }
+
+    public long getReviveTime() {
+        return popTime + invisibleTime;
+    }
+
+    public int getBitMap() {
+        return bitMap;
+    }
+
+    public void setBitMap(int bitMap) {
+        this.bitMap = bitMap;
+    }
+
+    public byte getNum() {
+        return num;
+    }
+
+    public void setNum(byte num) {
+        this.num = num;
+    }
+
+    public byte getQueueId() {
+        return queueId;
+    }
+
+    public void setQueueId(byte queueId) {
+        this.queueId = queueId;
+    }
+
+    public String getTopic() {
+        return topic;
+    }
+
+    public void setTopic(String topic) {
+        this.topic = topic;
+    }
+
+    public String getCId() {
+        return cid;
+    }
+
+    public void setCId(String cid) {
+        this.cid = cid;
+    }
+
+    public List<Integer> getQueueOffsetDiff() {
+        return queueOffsetDiff;
+    }
+
+    public void setQueueOffsetDiff(List<Integer> queueOffsetDiff) {
+        this.queueOffsetDiff = queueOffsetDiff;
+    }
+
+    public void addDiff(int diff) {
+        if (this.queueOffsetDiff == null) {
+            this.queueOffsetDiff = new ArrayList<>(8);
+        }
+        this.queueOffsetDiff.add(diff);
+    }
+
+    public int indexOfAck(long ackOffset) {
+        if (ackOffset < startOffset) {
+            return -1;
+        }
+
+        // old version of checkpoint
+        if (queueOffsetDiff == null || queueOffsetDiff.isEmpty()) {
+
+            if (ackOffset - startOffset < num) {
+                return (int) (ackOffset - startOffset);
+            }
+
+            return -1;
+        }
+
+        // new version of checkpoint
+        return queueOffsetDiff.indexOf((int) (ackOffset - startOffset));
+    }
+
+    public long ackOffsetByIndex(byte index) {
+        // old version of checkpoint
+        if (queueOffsetDiff == null || queueOffsetDiff.isEmpty()) {
+            return startOffset + index;
+        }
+
+        return startOffset + queueOffsetDiff.get(index);
+    }
+
+    @Override
+    public String toString() {
+        return "PopCheckPoint [topic=" + topic + ", cid=" + cid + ", queueId=" + queueId + ", startOffset=" + startOffset + ", bitMap=" + bitMap + ", num=" + num + ", reviveTime=" + getReviveTime()
+            + ", reviveOffset=" + reviveOffset + ", diff=" + queueOffsetDiff + "]";
+    }
+
+}
diff --git a/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java b/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java
index 3b19a16..ee994c3 100644
--- a/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java
+++ b/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java
@@ -16,25 +16,28 @@
  */
 package org.apache.rocketmq.store.schedule;
 
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
 import org.apache.rocketmq.common.ConfigManager;
 import org.apache.rocketmq.common.TopicFilterType;
 import org.apache.rocketmq.common.constant.LoggerName;
-import org.apache.rocketmq.common.topic.TopicValidator;
-import org.apache.rocketmq.logging.InternalLogger;
-import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.common.message.MessageAccessor;
 import org.apache.rocketmq.common.message.MessageConst;
 import org.apache.rocketmq.common.message.MessageDecoder;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.running.RunningStats;
+import org.apache.rocketmq.common.topic.TopicValidator;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.store.ConsumeQueue;
 import org.apache.rocketmq.store.ConsumeQueueExt;
 import org.apache.rocketmq.store.DefaultMessageStore;
@@ -221,6 +224,17 @@ public class ScheduleMessageService extends ConfigManager {
         return true;
     }
 
+    public int computeDelayLevel(long timeMillis) {
+        long intervalMillis = timeMillis - System.currentTimeMillis();
+        List<Map.Entry<Integer, Long>> sortedLevels = delayLevelTable.entrySet().stream().sorted(Comparator.comparingLong(Map.Entry::getValue)).collect(Collectors.toList());
+        for (Map.Entry<Integer, Long> entry : sortedLevels) {
+            if (entry.getValue() > intervalMillis) {
+                return entry.getKey();
+            }
+        }
+        return sortedLevels.get(sortedLevels.size() - 1).getKey();
+    }
+
     class DeliverDelayedMessageTimerTask extends TimerTask {
         private final int delayLevel;
         private final long offset;