You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2021/03/29 02:28:52 UTC
[rocketmq] branch pop_consumer updated: [RIP-19] Pop Consuming
(store)
This is an automated email from the ASF dual-hosted git repository.
duhengforever pushed a commit to branch pop_consumer
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/pop_consumer by this push:
new c29d5fd [RIP-19] Pop Consuming (store)
new ea36854 Merge pull request #2733 from ayanamist/pop_consumer
c29d5fd is described below
commit c29d5fd74e2b29cd84f141d278f193a58c2e3d24
Author: ayanamist <ay...@gmail.com>
AuthorDate: Tue Mar 9 11:09:10 2021 +0800
[RIP-19] Pop Consuming (store)
---
.../apache/rocketmq/store/DefaultMessageStore.java | 3 +-
.../apache/rocketmq/store/GetMessageResult.java | 10 ++
.../org/apache/rocketmq/store/MessageStore.java | 4 +-
.../java/org/apache/rocketmq/store/pop/AckMsg.java | 87 +++++++++++
.../apache/rocketmq/store/pop/PopCheckPoint.java | 163 +++++++++++++++++++++
.../store/schedule/ScheduleMessageService.java | 20 ++-
6 files changed, 282 insertions(+), 5 deletions(-)
diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index 14a8848..bb92cc0 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -672,7 +672,7 @@ public class DefaultMessageStore implements MessageStore {
}
this.storeStatsService.getGetMessageTransferedMsgCount().incrementAndGet();
- getResult.addMessage(selectResult);
+ getResult.addMessage(selectResult, offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE));
status = GetMessageStatus.FOUND;
nextPhyFileStartOffset = Long.MIN_VALUE;
}
@@ -1496,6 +1496,7 @@ public class DefaultMessageStore implements MessageStore {
return haService;
}
+ @Override
public ScheduleMessageService getScheduleMessageService() {
return scheduleMessageService;
}
diff --git a/store/src/main/java/org/apache/rocketmq/store/GetMessageResult.java b/store/src/main/java/org/apache/rocketmq/store/GetMessageResult.java
index 996e24d..6fcb310 100644
--- a/store/src/main/java/org/apache/rocketmq/store/GetMessageResult.java
+++ b/store/src/main/java/org/apache/rocketmq/store/GetMessageResult.java
@@ -27,6 +27,7 @@ public class GetMessageResult {
new ArrayList<SelectMappedBufferResult>(100);
private final List<ByteBuffer> messageBufferList = new ArrayList<ByteBuffer>(100);
+ private final List<Long> messageQueueOffset = new ArrayList<>(100);
private GetMessageStatus status;
private long nextBeginOffset;
@@ -90,6 +91,11 @@ public class GetMessageResult {
mapedBuffer.getSize() / BrokerStatsManager.SIZE_PER_COUNT);
}
+ public void addMessage(final SelectMappedBufferResult mapedBuffer, final long queueOffset) {
+ addMessage(mapedBuffer);
+ this.messageQueueOffset.add(queueOffset);
+ }
+
public void release() {
for (SelectMappedBufferResult select : this.messageMapedList) {
select.release();
@@ -124,6 +130,10 @@ public class GetMessageResult {
this.msgCount4Commercial = msgCount4Commercial;
}
+ public List<Long> getMessageQueueOffset() {
+ return messageQueueOffset;
+ }
+
@Override
public String toString() {
return "GetMessageResult [status=" + status + ", nextBeginOffset=" + nextBeginOffset + ", minOffset="
diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
index 64eb525..0cea607 100644
--- a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
@@ -20,10 +20,10 @@ import java.util.HashMap;
import java.util.LinkedList;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
-
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageExtBatch;
import org.apache.rocketmq.store.config.BrokerRole;
+import org.apache.rocketmq.store.schedule.ScheduleMessageService;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
/**
@@ -383,6 +383,8 @@ public interface MessageStore {
*/
ConsumeQueue getConsumeQueue(String topic, int queueId);
+ ScheduleMessageService getScheduleMessageService();
+
/**
* Get BrokerStatsManager of the messageStore.
*
diff --git a/store/src/main/java/org/apache/rocketmq/store/pop/AckMsg.java b/store/src/main/java/org/apache/rocketmq/store/pop/AckMsg.java
new file mode 100644
index 0000000..ab017a9
--- /dev/null
+++ b/store/src/main/java/org/apache/rocketmq/store/pop/AckMsg.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store.pop;
+
+public class AckMsg {
+ private long ackOffset;
+ private long startOffset;
+ private String consumerGroup;
+ private String topic;
+ private int queueId;
+ private long popTime;
+
+ public long getPopTime() {
+ return popTime;
+ }
+
+ public void setPopTime(long popTime) {
+ this.popTime = popTime;
+ }
+
+ public void setQueueId(int queueId) {
+ this.queueId = queueId;
+ }
+
+ public int getQueueId() {
+ return queueId;
+ }
+
+ public void setTopic(String topic) {
+ this.topic = topic;
+ }
+
+ public String getTopic() {
+ return topic;
+ }
+
+ public long getAckOffset() {
+ return ackOffset;
+ }
+
+ public String getConsumerGroup() {
+ return consumerGroup;
+ }
+
+ public void setConsumerGroup(String consumerGroup) {
+ this.consumerGroup = consumerGroup;
+ }
+
+ public void setAckOffset(long ackOffset) {
+ this.ackOffset = ackOffset;
+ }
+
+ public long getStartOffset() {
+ return startOffset;
+ }
+
+ public void setStartOffset(long startOffset) {
+ this.startOffset = startOffset;
+ }
+
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder("AckMsg{");
+ sb.append("ackOffset=").append(ackOffset);
+ sb.append(", startOffset=").append(startOffset);
+ sb.append(", consumerGroup='").append(consumerGroup).append('\'');
+ sb.append(", topic='").append(topic).append('\'');
+ sb.append(", queueId=").append(queueId);
+ sb.append(", popTime=").append(popTime);
+ sb.append('}');
+ return sb.toString();
+ }
+}
diff --git a/store/src/main/java/org/apache/rocketmq/store/pop/PopCheckPoint.java b/store/src/main/java/org/apache/rocketmq/store/pop/PopCheckPoint.java
new file mode 100644
index 0000000..f5b7ac6
--- /dev/null
+++ b/store/src/main/java/org/apache/rocketmq/store/pop/PopCheckPoint.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store.pop;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class PopCheckPoint {
+ private long startOffset;
+ private long popTime;
+ private long invisibleTime;
+ private int bitMap;
+ private byte num;
+ private byte queueId;
+ private String topic;
+ private String cid;
+ private long reviveOffset;
+ private List<Integer> queueOffsetDiff;
+
+ public long getReviveOffset() {
+ return reviveOffset;
+ }
+
+ public void setReviveOffset(long reviveOffset) {
+ this.reviveOffset = reviveOffset;
+ }
+
+ public long getStartOffset() {
+ return startOffset;
+ }
+
+ public void setStartOffset(long startOffset) {
+ this.startOffset = startOffset;
+ }
+
+ public void getStartOffset(long startOffset) {
+ this.startOffset = startOffset;
+ }
+
+ public void setPopTime(long popTime) {
+ this.popTime = popTime;
+ }
+
+ public void setInvisibleTime(long invisibleTime) {
+ this.invisibleTime = invisibleTime;
+ }
+
+ public long getPopTime() {
+ return popTime;
+ }
+
+ public long getInvisibleTime() {
+ return invisibleTime;
+ }
+
+ public long getReviveTime() {
+ return popTime + invisibleTime;
+ }
+
+ public int getBitMap() {
+ return bitMap;
+ }
+
+ public void setBitMap(int bitMap) {
+ this.bitMap = bitMap;
+ }
+
+ public byte getNum() {
+ return num;
+ }
+
+ public void setNum(byte num) {
+ this.num = num;
+ }
+
+ public byte getQueueId() {
+ return queueId;
+ }
+
+ public void setQueueId(byte queueId) {
+ this.queueId = queueId;
+ }
+
+ public String getTopic() {
+ return topic;
+ }
+
+ public void setTopic(String topic) {
+ this.topic = topic;
+ }
+
+ public String getCId() {
+ return cid;
+ }
+
+ public void setCId(String cid) {
+ this.cid = cid;
+ }
+
+ public List<Integer> getQueueOffsetDiff() {
+ return queueOffsetDiff;
+ }
+
+ public void setQueueOffsetDiff(List<Integer> queueOffsetDiff) {
+ this.queueOffsetDiff = queueOffsetDiff;
+ }
+
+ public void addDiff(int diff) {
+ if (this.queueOffsetDiff == null) {
+ this.queueOffsetDiff = new ArrayList<>(8);
+ }
+ this.queueOffsetDiff.add(diff);
+ }
+
+ public int indexOfAck(long ackOffset) {
+ if (ackOffset < startOffset) {
+ return -1;
+ }
+
+ // old version of checkpoint
+ if (queueOffsetDiff == null || queueOffsetDiff.isEmpty()) {
+
+ if (ackOffset - startOffset < num) {
+ return (int) (ackOffset - startOffset);
+ }
+
+ return -1;
+ }
+
+ // new version of checkpoint
+ return queueOffsetDiff.indexOf((int) (ackOffset - startOffset));
+ }
+
+ public long ackOffsetByIndex(byte index) {
+ // old version of checkpoint
+ if (queueOffsetDiff == null || queueOffsetDiff.isEmpty()) {
+ return startOffset + index;
+ }
+
+ return startOffset + queueOffsetDiff.get(index);
+ }
+
+ @Override
+ public String toString() {
+ return "PopCheckPoint [topic=" + topic + ", cid=" + cid + ", queueId=" + queueId + ", startOffset=" + startOffset + ", bitMap=" + bitMap + ", num=" + num + ", reviveTime=" + getReviveTime()
+ + ", reviveOffset=" + reviveOffset + ", diff=" + queueOffsetDiff + "]";
+ }
+
+}
diff --git a/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java b/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java
index 3b19a16..ee994c3 100644
--- a/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java
+++ b/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java
@@ -16,25 +16,28 @@
*/
package org.apache.rocketmq.store.schedule;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
import org.apache.rocketmq.common.ConfigManager;
import org.apache.rocketmq.common.TopicFilterType;
import org.apache.rocketmq.common.constant.LoggerName;
-import org.apache.rocketmq.common.topic.TopicValidator;
-import org.apache.rocketmq.logging.InternalLogger;
-import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.running.RunningStats;
+import org.apache.rocketmq.common.topic.TopicValidator;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.store.ConsumeQueue;
import org.apache.rocketmq.store.ConsumeQueueExt;
import org.apache.rocketmq.store.DefaultMessageStore;
@@ -221,6 +224,17 @@ public class ScheduleMessageService extends ConfigManager {
return true;
}
+ public int computeDelayLevel(long timeMillis) {
+ long intervalMillis = timeMillis - System.currentTimeMillis();
+ List<Map.Entry<Integer, Long>> sortedLevels = delayLevelTable.entrySet().stream().sorted(Comparator.comparingLong(Map.Entry::getValue)).collect(Collectors.toList());
+ for (Map.Entry<Integer, Long> entry : sortedLevels) {
+ if (entry.getValue() > intervalMillis) {
+ return entry.getKey();
+ }
+ }
+ return sortedLevels.get(sortedLevels.size() - 1).getKey();
+ }
+
class DeliverDelayedMessageTimerTask extends TimerTask {
private final int delayLevel;
private final long offset;