You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2018/08/21 03:17:33 UTC

[GitHub] vongosling closed pull request #420: 第7组 全局排序的Consumer

vongosling closed pull request #420: 第7组 全局排序的Consumer
URL: https://github.com/apache/rocketmq/pull/420
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/example/src/main/java/org/apache/rocketmq/example/simple/OrderedConsumer.java b/example/src/main/java/org/apache/rocketmq/example/simple/OrderedConsumer.java
new file mode 100644
index 000000000..fd1fbdef9
--- /dev/null
+++ b/example/src/main/java/org/apache/rocketmq/example/simple/OrderedConsumer.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.rocketmq.example.simple;
+
+import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
+import org.apache.rocketmq.client.consumer.PullResult;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+
+//import java.lang.reflect.Array;
+import java.util.HashMap;
+//import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import java.util.Queue;
+import java.util.Comparator;
+import java.util.PriorityQueue;
+
+public class OrderedConsumer {
+
+    private static final Map<MessageQueue, Long> OFFSE_TABLE = new HashMap<MessageQueue, Long>();
+
+    public static void main(String[] args) throws MQClientException {
+
+        DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5");
+        consumer.start();
+
+
+        Queue<MessageExt> integerPriorityQueue = new PriorityQueue<>(1024,idComparator);
+
+        //Queue<MessageExt> queue = new LinkedList<MessageExt>();
+
+
+
+
+        Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TopicTest");
+
+        for (MessageQueue mq : mqs) {
+            try {
+                PullResult pullResult =
+                        consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 1);
+
+                System.out.printf("%s%n", pullResult);
+
+                for (MessageExt mes : pullResult.getMsgFoundList()) {
+                    integerPriorityQueue.add(mes);
+                }
+                putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
+
+
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }
+
+
+        boolean isEmpty = false;
+        int queueId = 0;
+
+
+        while (true) {
+
+            MessageExt lastMessage = integerPriorityQueue.poll();
+            System.out.printf("Time: %d%n",lastMessage.getBornTimestamp());
+            // you can print or deal  the message, this message is
+
+
+
+
+            queueId = lastMessage.getQueueId();
+
+            for (MessageQueue mq : mqs) {
+                if (mq.getQueueId() != queueId)
+                {
+                    continue;
+                }
+
+                try {
+                    PullResult pullResult =
+                            consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 1);
+
+                    System.out.printf("QueueID:%d %s%n",mq.getQueueId(), pullResult);
+
+                    for (MessageExt mes : pullResult.getMsgFoundList()) {
+                        integerPriorityQueue.add(mes);
+                    }
+                    putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
+
+                    switch (pullResult.getPullStatus()) {
+                        case FOUND:
+                            break;
+                        case NO_MATCHED_MSG:
+                            break;
+                        case NO_NEW_MSG:
+                            isEmpty = true;
+                            break ;
+                        case OFFSET_ILLEGAL:
+                            break;
+                        default:
+                            break;
+                    }
+
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+            if (isEmpty)
+            {
+                break;
+            }
+        }
+        consumer.shutdown();
+    }
+
+
+
+    public static Comparator<MessageExt> idComparator = new Comparator<MessageExt>() {
+        @Override
+        public int compare(MessageExt c1, MessageExt c2) {
+            if (c1.getBornTimestamp() > c2.getBornTimestamp()) {
+                return 1;
+            }
+            else
+            {
+                return 0;
+            }
+        }
+    };
+
+
+    private static long getMessageQueueOffset(MessageQueue mq) {
+        Long offset = OFFSE_TABLE.get(mq);
+        if (offset != null)
+            return offset;
+
+        return 0;
+    }
+
+    private static void putMessageQueueOffset(MessageQueue mq, long offset) {
+        OFFSE_TABLE.put(mq, offset);
+    }
+}
diff --git a/store/src/main/java/org/apache/rocketmq/store/NonPersistentConsumeQueue.java b/store/src/main/java/org/apache/rocketmq/store/NonPersistentConsumeQueue.java
new file mode 100644
index 000000000..e102e56b1
--- /dev/null
+++ b/store/src/main/java/org/apache/rocketmq/store/NonPersistentConsumeQueue.java
@@ -0,0 +1,73 @@
+package org.apache.rocketmq.store;
+
+import java.util.concurrent.CopyOnWriteArrayList;
+
+public class NonPersistentConsumeQueue {
+    private String topic;
+    private int queueId;
+    private long maxOffsetInQueue;
+    private long minOffsetInQueue;
+    private long maxOffset;
+    private long minOffset;
+
+    private final CopyOnWriteArrayList<NonPersistentMsg> msgCopyOnWriteArrayList = new CopyOnWriteArrayList<NonPersistentMsg>();
+
+    private NonPersistentMessageStore nonPersistentMessageStore;
+
+    public NonPersistentConsumeQueue() {
+    }
+
+//    public NonPersistentConsumeQueue(NonPersistentMessageStore msgStore){
+//        this.nonPersistentMessageStore = msgStore;
+//    }
+
+
+    public NonPersistentMessageStore getNonPersistentMessageStore() {
+        return nonPersistentMessageStore;
+    }
+
+
+    public NonPersistentConsumeQueue(String topic, int queueId, NonPersistentMessageStore msgStore){
+        this.queueId = queueId;
+        this.topic = topic;
+        this.nonPersistentMessageStore = msgStore;
+    }
+
+    public PutMessageResult putMessage(MessageExtBrokerInner msg){
+        AppendMessageResult result = null;
+        PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);
+        return putMessageResult;
+    }
+
+    public CopyOnWriteArrayList<NonPersistentMsg> getMsgCopyOnWriteArrayList() {
+        return msgCopyOnWriteArrayList;
+    }
+
+    public long getMaxOffsetInQueue(){
+        return maxOffsetInQueue;
+    }
+
+    public long getMinOffsetInQueue() {
+        return minOffsetInQueue;
+    }
+
+    public long getMaxOffset() {
+        return maxOffset;
+    }
+
+    public long getMinOffset() {
+        return minOffset;
+    }
+
+    //根据offset获取msg
+    public SelectMappedBufferResult getMessageResult(long offset){
+        NonPersistentMsg msg = msgCopyOnWriteArrayList.get((int)offset);
+        SelectMappedBufferResult selectedMappedBufferResult = new SelectMappedBufferResult(offset, msg);
+        return selectedMappedBufferResult;
+    }
+
+    public SelectMappedBufferResult getMessage(long offset, int size){
+        SelectMappedBufferResult selectedMappedBufferResult = new SelectMappedBufferResult(offset, size);
+        return selectedMappedBufferResult;
+    }
+}
diff --git a/store/src/main/java/org/apache/rocketmq/store/NonPersistentMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/NonPersistentMessageStore.java
new file mode 100644
index 000000000..1ef01a57c
--- /dev/null
+++ b/store/src/main/java/org/apache/rocketmq/store/NonPersistentMessageStore.java
@@ -0,0 +1,440 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store;
+
+import org.apache.rocketmq.common.*;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageExtBatch;
+import org.apache.rocketmq.common.running.RunningStats;
+import org.apache.rocketmq.common.sysflag.MessageSysFlag;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.store.config.BrokerRole;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.apache.rocketmq.store.config.StorePathConfigHelper;
+import org.apache.rocketmq.store.ha.HAService;
+import org.apache.rocketmq.store.index.QueryOffsetResult;
+import org.apache.rocketmq.store.schedule.ScheduleMessageService;
+import org.apache.rocketmq.store.stats.BrokerStatsManager;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileLock;
+import java.util.*;
+import java.util.Map.Entry;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class NonPersistentMessageStore{
+    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
+
+    private final MessageStoreConfig messageStoreConfig;
+
+    //talbe根据topic获取consumeQueue
+    private final ConcurrentMap<String/* topic */, ConcurrentMap<Integer/* queueId */, NonPersistentConsumeQueue>> nonPersistentConsumeQueueTable;
+
+    private final NonPersistentConsumeQueue msgQueueLog;
+
+    private final StoreStatsService storeStatsService;
+
+    private final TransientStorePool transientStorePool;
+
+    private final RunningFlags runningFlags = new RunningFlags();
+
+    private final ScheduledExecutorService scheduledExecutorService =
+        Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("StoreScheduledThread"));
+    private final BrokerStatsManager brokerStatsManager;
+    private final MessageArrivingListener messageArrivingListener;
+    private final BrokerConfig brokerConfig;
+
+    private volatile boolean shutdown = true;
+
+    private AtomicLong printTimes = new AtomicLong(0);
+
+    private final LinkedList<CommitLogDispatcher> dispatcherList;
+
+    PutMessageLock putMessageLock;
+
+
+    public NonPersistentMessageStore(final MessageStoreConfig messageStoreConfig, final BrokerStatsManager brokerStatsManager,
+                                     final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig) throws IOException {
+        this.messageArrivingListener = messageArrivingListener;
+        this.brokerConfig = brokerConfig;
+        this.messageStoreConfig = messageStoreConfig;
+        this.brokerStatsManager = brokerStatsManager;
+
+        //
+        this.nonPersistentConsumeQueueTable = new ConcurrentHashMap<>(32);
+        this.msgQueueLog = new NonPersistentConsumeQueue(this);
+
+        this.storeStatsService = new StoreStatsService();
+
+        this.transientStorePool = new TransientStorePool(messageStoreConfig);
+
+        if (messageStoreConfig.isTransientStorePoolEnable()) {
+            this.transientStorePool.init();
+        }
+
+        this.dispatcherList = new LinkedList<>();
+    }
+
+    /**
+     * @throws Exception
+     */
+    public void start() throws Exception {
+
+        this.storeStatsService.start();
+        this.shutdown = false;
+    }
+
+    public void shutdown() {
+        if (!this.shutdown) {
+            this.shutdown = true;
+
+            this.scheduledExecutorService.shutdown();
+
+            try {
+
+                Thread.sleep(1000);
+            } catch (InterruptedException e) {
+                log.error("shutdown Exception, ", e);
+            }
+
+            this.storeStatsService.shutdown();
+        }
+
+        this.transientStorePool.destroy();
+
+    }
+
+    /**
+     * 非持久化putMessage
+     * 在JVM中通过CopyOnWriteArrayList实现消息的同步非持久化存储,将消息存入到对应Topic、queueId的两层CurrentMap中
+     */
+    public PutMessageResult putMessage(MessageExtBrokerInner msg) {
+        if (this.shutdown) {
+            log.warn("message store has shutdown, so putMessage is forbidden");
+            return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
+        }
+
+        if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
+            long value = this.printTimes.getAndIncrement();
+            if ((value % 50000) == 0) {
+                log.warn("message store is slave mode, so putMessage is forbidden ");
+            }
+
+            return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
+        }
+
+        if (!this.runningFlags.isWriteable()) {
+            long value = this.printTimes.getAndIncrement();
+            if ((value % 50000) == 0) {
+                log.warn("message store is not writeable, so putMessage is forbidden " + this.runningFlags.getFlagBits());
+            }
+
+            return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
+        } else {
+            this.printTimes.set(0);
+        }
+
+        if (msg.getTopic().length() > Byte.MAX_VALUE) {
+            log.warn("putMessage message topic length too long " + msg.getTopic().length());
+            return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
+        }
+
+        if (msg.getPropertiesString() != null && msg.getPropertiesString().length() > Short.MAX_VALUE) {
+            log.warn("putMessage message properties length too long " + msg.getPropertiesString().length());
+            return new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, null);
+        }
+
+        PutMessageResult putMessageResult;
+        AppendMessageResult result = null;
+        putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
+        try {
+            String topic = msg.getTopic();
+            int queueId = msg.getQueueId();
+            String propertiesString = msg.getPropertiesString();
+            NonPersistentMsg msg1 = new NonPersistentMsg();
+            msg1.setTopic(topic);
+            msg1.setQueueId(queueId);
+            msg1.setMessageProperties(propertiesString);
+
+            ConcurrentMap<Integer/* queueId */, NonPersistentConsumeQueue> map = this.nonPersistentConsumeQueueTable.get(topic);
+            if (null == map){
+                map = new ConcurrentHashMap<Integer, NonPersistentConsumeQueue>();
+            }
+            msgQueueLog.getMsgCopyOnWriteArrayList().addIfAbsent(msg1);
+            map.putIfAbsent(queueId, msgQueueLog);
+            nonPersistentConsumeQueueTable.putIfAbsent(topic, map);
+        } finally {
+            putMessageLock.unlock();
+        }
+        putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);
+
+        if (!putMessageResult.isOk()) {
+            this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();
+        }
+
+        return putMessageResult;
+    }
+
+    /**
+     * 非持久化getMessage,从CopyOnWriteArrayList根据offset获取消息
+     */
+    public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset,
+        final int maxMsgNums,
+        final MessageFilter messageFilter) {
+        if (this.shutdown) {
+            log.warn("message store has shutdown, so getMessage is forbidden");
+            return null;
+        }
+
+        if (!this.runningFlags.isReadable()) {
+            log.warn("message store is not readable, so getMessage is forbidden " + this.runningFlags.getFlagBits());
+            return null;
+        }
+
+        GetMessageStatus status = GetMessageStatus.NO_MESSAGE_IN_QUEUE;
+        long nextBeginOffset = offset;
+        long minOffset = 0;
+        long maxOffset = 0;
+
+        GetMessageResult getResult = new GetMessageResult();
+
+        //获取物理最大偏移量 TODO
+        final long maxOffsetPy = this.msgQueueLog.getMaxOffset();
+
+        //队列
+        NonPersistentConsumeQueue consumeQueue = findConsumeQueue(topic, queueId);
+        if (consumeQueue != null) {
+            minOffset = consumeQueue.getMinOffsetInQueue();
+            maxOffset = consumeQueue.getMaxOffsetInQueue();
+
+            if (maxOffset == 0) {
+                status = GetMessageStatus.NO_MESSAGE_IN_QUEUE;
+            } else if (offset < minOffset) {
+                status = GetMessageStatus.OFFSET_TOO_SMALL;
+            } else if (offset == maxOffset) {
+                status = GetMessageStatus.OFFSET_OVERFLOW_ONE;
+            } else if (offset > maxOffset) {
+                status = GetMessageStatus.OFFSET_OVERFLOW_BADLY;
+            } else {
+                //根据偏移量获取队列
+
+                SelectMappedBufferResult bufferConsumeQueue = consumeQueue.getMessageResult(offset);
+                if (bufferConsumeQueue != null) {
+                    try {
+                        status = GetMessageStatus.NO_MATCHED_MESSAGE;
+
+                        long nextPhyFileStartOffset = Long.MIN_VALUE;
+                        long maxPhyOffsetPulling = 0;
+
+                        int i = 0;
+                        final int maxFilterMessageCount = Math.max(16000, maxMsgNums * ConsumeQueue.CQ_STORE_UNIT_SIZE);
+
+                        ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
+                        for (; i < bufferConsumeQueue.getSize() && i < maxFilterMessageCount; i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
+                            long offsetPy = bufferConsumeQueue.getByteBuffer().getLong();
+                            int sizePy = bufferConsumeQueue.getByteBuffer().getInt();
+                            long tagsCode = bufferConsumeQueue.getByteBuffer().getLong();
+
+                            maxPhyOffsetPulling = offsetPy;
+
+                            if (nextPhyFileStartOffset != Long.MIN_VALUE) {
+                                if (offsetPy < nextPhyFileStartOffset)
+                                    continue;
+                            }
+
+                            boolean extRet = false, isTagsCodeLegal = true;
+
+                            if (messageFilter != null
+                                && !messageFilter.isMatchedByConsumeQueue(isTagsCodeLegal ? tagsCode : null, extRet ? cqExtUnit : null)) {
+                                if (getResult.getBufferTotalSize() == 0) {
+                                    status = GetMessageStatus.NO_MATCHED_MESSAGE;
+                                }
+
+                                continue;
+                            }
+
+                            SelectMappedBufferResult selectResult = this.msgQueueLog.getMessage(offsetPy, sizePy);
+                            if (null == selectResult) {
+                                if (getResult.getBufferTotalSize() == 0) {
+                                    status = GetMessageStatus.MESSAGE_WAS_REMOVING;
+                                }
+
+                                continue;
+                            }
+
+                            if (messageFilter != null
+                                && !messageFilter.isMatchedByCommitLog(selectResult.getByteBuffer().slice(), null)) {
+                                if (getResult.getBufferTotalSize() == 0) {
+                                    status = GetMessageStatus.NO_MATCHED_MESSAGE;
+                                }
+                                // release...
+                                selectResult.release();
+                                continue;
+                            }
+
+                            this.storeStatsService.getGetMessageTransferedMsgCount().incrementAndGet();
+                            getResult.addMessage(selectResult);
+                            status = GetMessageStatus.FOUND;
+                            nextPhyFileStartOffset = Long.MIN_VALUE;
+                        }
+
+
+                        nextBeginOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
+
+                        long diff = maxOffsetPy - maxPhyOffsetPulling;
+                        long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE
+                            * (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0));
+                        getResult.setSuggestPullingFromSlave(diff > memory);
+                    } finally {
+
+                        bufferConsumeQueue.release();
+                    }
+                } else {
+                    status = GetMessageStatus.OFFSET_FOUND_NULL;
+                    log.warn("consumer request topic: " + topic + "offset: " + offset + " minOffset: " + minOffset + " maxOffset: "
+                        + maxOffset + ", but access logic queue failed.");
+                }
+            }
+        } else {
+            status = GetMessageStatus.NO_MATCHED_LOGIC_QUEUE;
+        }
+
+        if (GetMessageStatus.FOUND == status) {
+            this.storeStatsService.getGetMessageTimesTotalFound().incrementAndGet();
+        } else {
+            this.storeStatsService.getGetMessageTimesTotalMiss().incrementAndGet();
+        }
+
+        getResult.setStatus(status);
+        getResult.setNextBeginOffset(nextBeginOffset);
+        getResult.setMaxOffset(maxOffset);
+        getResult.setMinOffset(minOffset);
+        return getResult;
+    }
+    /**
+     * 根据topic,queueID获取消息队列,这个消息队列是个CopyOnWriteArrayList实现
+     */
+    public NonPersistentConsumeQueue findConsumeQueue(String topic, int queueId) {
+        ConcurrentMap<Integer, NonPersistentConsumeQueue> map = nonPersistentConsumeQueueTable.get(topic);
+        if (null == map) {
+            ConcurrentMap<Integer, NonPersistentConsumeQueue> newMap = new ConcurrentHashMap<Integer, NonPersistentConsumeQueue>(128);
+            ConcurrentMap<Integer, NonPersistentConsumeQueue> oldMap = nonPersistentConsumeQueueTable.putIfAbsent(topic, newMap);
+            if (oldMap != null) {
+                map = oldMap;
+            } else {
+                map = newMap;
+            }
+        }
+
+        NonPersistentConsumeQueue logic = map.get(queueId);
+        if (null == logic) {
+            NonPersistentConsumeQueue newLogic = new NonPersistentConsumeQueue(
+                    topic,
+                    queueId,
+                    this);
+            NonPersistentConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic);
+            if (oldLogic != null) {
+                logic = oldLogic;
+            } else {
+                logic = newLogic;
+            }
+        }
+
+        return logic;
+    }
+
+    public long getMaxOffsetInQueue(String topic, int queueId) {
+        NonPersistentConsumeQueue logic = this.findConsumeQueue(topic, queueId);
+        if (logic != null) {
+            long offset = logic.getMaxOffsetInQueue();
+            return offset;
+        }
+
+        return 0;
+    }
+
+    public long getMinOffsetInQueue(String topic, int queueId) {
+        NonPersistentConsumeQueue logic = this.findConsumeQueue(topic, queueId);
+        if (logic != null) {
+            return logic.getMinOffsetInQueue();
+        }
+
+        return -1;
+    }
+
+    public long getCommitLogOffsetInQueue(String topic, int queueId, long consumeQueueOffset) {
+        NonPersistentConsumeQueue consumeQueue = findConsumeQueue(topic, queueId);
+        if (consumeQueue != null) {
+            SelectMappedBufferResult bufferConsumeQueue = consumeQueue.getMessageResult(consumeQueueOffset);
+            if (bufferConsumeQueue != null) {
+                try {
+                    long offsetPy = bufferConsumeQueue.getByteBuffer().getLong();
+                    return offsetPy;
+                } finally {
+                    bufferConsumeQueue.release();
+                }
+            }
+        }
+
+        return 0;
+    }
+
+    public MessageExt lookMessageByOffset(long commitLogOffset) {
+        SelectMappedBufferResult sbr = this.msgQueueLog.getMessage(commitLogOffset, 4);
+        if (null != sbr) {
+            try {
+                // 1 TOTALSIZE
+                int size = sbr.getByteBuffer().getInt();
+                return lookMessageByOffset(commitLogOffset, size);
+            } finally {
+                sbr.release();
+            }
+        }
+
+        return null;
+    }
+
+    public MessageExt lookMessageByOffset(long commitLogOffset, int size) {
+        SelectMappedBufferResult sbr = this.msgQueueLog.getMessage(commitLogOffset, size);
+        if (null != sbr) {
+            try {
+                return MessageDecoder.decode(sbr.getByteBuffer(), true, false);
+            } finally {
+                sbr.release();
+            }
+        }
+
+        return null;
+    }
+
+    public SelectMappedBufferResult getCommitLogData(final long offset) {
+        if (this.shutdown) {
+            log.warn("message store has shutdown, so getPhyQueueData is forbidden");
+            return null;
+        }
+
+        return this.msgQueueLog.getMessageResult(offset);
+    }
+}
diff --git a/store/src/main/java/org/apache/rocketmq/store/NonPersistentMsg.java b/store/src/main/java/org/apache/rocketmq/store/NonPersistentMsg.java
new file mode 100644
index 000000000..14b613e1c
--- /dev/null
+++ b/store/src/main/java/org/apache/rocketmq/store/NonPersistentMsg.java
@@ -0,0 +1,31 @@
+package org.apache.rocketmq.store;
+
+public class NonPersistentMsg {
+    String topic;
+    int queueId;
+    String messageProperties;
+
+    public String getTopic() {
+        return topic;
+    }
+
+    public void setTopic(String topic) {
+        this.topic = topic;
+    }
+
+    public int getQueueId() {
+        return queueId;
+    }
+
+    public void setQueueId(int queueId) {
+        this.queueId = queueId;
+    }
+
+    public String getMessageProperties() {
+        return messageProperties;
+    }
+
+    public void setMessageProperties(String messageProperties) {
+        this.messageProperties = messageProperties;
+    }
+}
diff --git a/store/src/main/java/org/apache/rocketmq/store/SelectMappedBufferResult.java b/store/src/main/java/org/apache/rocketmq/store/SelectMappedBufferResult.java
index 7a17114c8..9097ad4de 100644
--- a/store/src/main/java/org/apache/rocketmq/store/SelectMappedBufferResult.java
+++ b/store/src/main/java/org/apache/rocketmq/store/SelectMappedBufferResult.java
@@ -28,6 +28,8 @@
 
     private MappedFile mappedFile;
 
+    private NonPersistentMsg nonPersistentMsg;
+
     public SelectMappedBufferResult(long startOffset, ByteBuffer byteBuffer, int size, MappedFile mappedFile) {
         this.startOffset = startOffset;
         this.byteBuffer = byteBuffer;
@@ -35,6 +37,20 @@ public SelectMappedBufferResult(long startOffset, ByteBuffer byteBuffer, int siz
         this.mappedFile = mappedFile;
     }
 
+    public SelectMappedBufferResult() {
+
+    }
+
+    public SelectMappedBufferResult(long startOffset, int size) {
+        this.startOffset = startOffset;
+        this.size = size;
+    }
+
+    public SelectMappedBufferResult(long startOffset, NonPersistentMsg msg) {
+        this.startOffset = startOffset;
+        this.nonPersistentMsg = msg;
+    }
+
     public ByteBuffer getByteBuffer() {
         return byteBuffer;
     }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services