You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2016/12/19 09:40:27 UTC

[10/43] incubator-rocketmq git commit: Finish code dump. Reviewed by: @yukon @vongosling @stevenschew @vintagewang @lollipop @zander

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/DefaultMessageStore.java
----------------------------------------------------------------------
diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/DefaultMessageStore.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/DefaultMessageStore.java
new file mode 100644
index 0000000..c922c8d
--- /dev/null
+++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/DefaultMessageStore.java
@@ -0,0 +1,1748 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.store;
+
+import com.alibaba.rocketmq.common.*;
+import com.alibaba.rocketmq.common.constant.LoggerName;
+import com.alibaba.rocketmq.common.message.MessageDecoder;
+import com.alibaba.rocketmq.common.message.MessageExt;
+import com.alibaba.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import com.alibaba.rocketmq.common.running.RunningStats;
+import com.alibaba.rocketmq.common.sysflag.MessageSysFlag;
+import com.alibaba.rocketmq.store.config.BrokerRole;
+import com.alibaba.rocketmq.store.config.MessageStoreConfig;
+import com.alibaba.rocketmq.store.config.StorePathConfigHelper;
+import com.alibaba.rocketmq.store.ha.HAService;
+import com.alibaba.rocketmq.store.index.IndexService;
+import com.alibaba.rocketmq.store.index.QueryOffsetResult;
+import com.alibaba.rocketmq.store.schedule.ScheduleMessageService;
+import com.alibaba.rocketmq.store.stats.BrokerStatsManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static com.alibaba.rocketmq.store.config.BrokerRole.SLAVE;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class DefaultMessageStore implements MessageStore {
+    private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
+
+    private final MessageFilter messageFilter = new DefaultMessageFilter();
+
+    private final MessageStoreConfig messageStoreConfig;
+    // CommitLog
+    private final CommitLog commitLog;
+
+    private final ConcurrentHashMap<String/* topic */, ConcurrentHashMap<Integer/* queueId */, ConsumeQueue>> consumeQueueTable;
+
+    private final FlushConsumeQueueService flushConsumeQueueService;
+
+    private final CleanCommitLogService cleanCommitLogService;
+
+    private final CleanConsumeQueueService cleanConsumeQueueService;
+
+    private final IndexService indexService;
+
+    private final AllocateMappedFileService allocateMappedFileService;
+
+    private final ReputMessageService reputMessageService;
+
+    private final HAService haService;
+
+    private final ScheduleMessageService scheduleMessageService;
+
+    private final StoreStatsService storeStatsService;
+
+    private final TransientStorePool transientStorePool;
+
+    private final RunningFlags runningFlags = new RunningFlags();
+    private final SystemClock systemClock = new SystemClock();
+
+    private final ScheduledExecutorService scheduledExecutorService =
+            Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("StoreScheduledThread"));
+    private final BrokerStatsManager brokerStatsManager;
+    private final MessageArrivingListener messageArrivingListener;
+    private final BrokerConfig brokerConfig;
+
+    private volatile boolean shutdown = true;
+
+    private StoreCheckpoint storeCheckpoint;
+
+    private AtomicLong printTimes = new AtomicLong(0);
+
+    public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final BrokerStatsManager brokerStatsManager,
+                               final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig) throws IOException {
+        this.messageArrivingListener = messageArrivingListener;
+        this.brokerConfig = brokerConfig;
+        this.messageStoreConfig = messageStoreConfig;
+        this.brokerStatsManager = brokerStatsManager;
+        this.allocateMappedFileService = new AllocateMappedFileService(this);
+        this.commitLog = new CommitLog(this);
+        this.consumeQueueTable = new ConcurrentHashMap<>(32);
+
+        this.flushConsumeQueueService = new FlushConsumeQueueService();
+        this.cleanCommitLogService = new CleanCommitLogService();
+        this.cleanConsumeQueueService = new CleanConsumeQueueService();
+        this.storeStatsService = new StoreStatsService();
+        this.indexService = new IndexService(this);
+        this.haService = new HAService(this);
+
+        this.reputMessageService = new ReputMessageService();
+
+        this.scheduleMessageService = new ScheduleMessageService(this);
+
+        this.transientStorePool = new TransientStorePool(messageStoreConfig);
+
+        if (messageStoreConfig.isTransientStorePoolEnable()) {
+            this.transientStorePool.init();
+        }
+
+
+        this.allocateMappedFileService.start();
+
+        this.indexService.start();
+    }
+
+
+    public void truncateDirtyLogicFiles(long phyOffset) {
+        ConcurrentHashMap<String, ConcurrentHashMap<Integer, ConsumeQueue>> tables = DefaultMessageStore.this.consumeQueueTable;
+
+        for (ConcurrentHashMap<Integer, ConsumeQueue> maps : tables.values()) {
+            for (ConsumeQueue logic : maps.values()) {
+                logic.truncateDirtyLogicFiles(phyOffset);
+            }
+        }
+    }
+
+
+    /**
+     * @throws IOException
+     */
+    public boolean load() {
+        boolean result = true;
+
+        try {
+            boolean lastExitOK = !this.isTempFileExist();
+            log.info("last shutdown {}", lastExitOK ? "normally" : "abnormally");
+
+            if (null != scheduleMessageService) {
+                result = result && this.scheduleMessageService.load();
+            }
+
+            // load Commit Log
+            result = result && this.commitLog.load();
+
+            // load Consume Queue
+            result = result && this.loadConsumeQueue();
+
+            if (result) {
+                this.storeCheckpoint =
+                        new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));
+
+                this.indexService.load(lastExitOK);
+
+
+                this.recover(lastExitOK);
+
+                log.info("load over, and the max phy offset = {}", this.getMaxPhyOffset());
+            }
+        } catch (Exception e) {
+            log.error("load exception", e);
+            result = false;
+        }
+
+        if (!result) {
+            this.allocateMappedFileService.shutdown();
+        }
+
+        return result;
+    }
+
+    /**
+     * @throws Exception
+     */
+    public void start() throws Exception {
+        this.flushConsumeQueueService.start();
+        this.commitLog.start();
+        this.storeStatsService.start();
+
+
+        if (this.scheduleMessageService != null && SLAVE != messageStoreConfig.getBrokerRole()) {
+            this.scheduleMessageService.start();
+        }
+
+        if (this.getMessageStoreConfig().isDuplicationEnable()) {
+            this.reputMessageService.setReputFromOffset(this.commitLog.getConfirmOffset());
+        } else {
+            this.reputMessageService.setReputFromOffset(this.commitLog.getMaxOffset());
+        }
+        this.reputMessageService.start();
+
+        this.haService.start();
+
+        this.createTempFile();
+        this.addScheduleTask();
+        this.shutdown = false;
+    }
+
+    /**
+
+     */
+    public void shutdown() {
+        if (!this.shutdown) {
+            this.shutdown = true;
+
+            this.scheduledExecutorService.shutdown();
+
+            try {
+
+                Thread.sleep(1000 * 3);
+            } catch (InterruptedException e) {
+                log.error("shutdown Exception, ", e);
+            }
+
+            if (this.scheduleMessageService != null) {
+                this.scheduleMessageService.shutdown();
+            }
+
+            this.haService.shutdown();
+
+            this.storeStatsService.shutdown();
+            this.indexService.shutdown();
+            this.commitLog.shutdown();
+            this.reputMessageService.shutdown();
+            this.flushConsumeQueueService.shutdown();
+            this.allocateMappedFileService.shutdown();
+            this.storeCheckpoint.flush();
+            this.storeCheckpoint.shutdown();
+
+            if (this.runningFlags.isWriteable()) {
+                this.deleteFile(StorePathConfigHelper.getAbortFile(this.messageStoreConfig.getStorePathRootDir()));
+            } else {
+                log.warn("the store may be wrong, so shutdown abnormally, and keep abort file.");
+            }
+        }
+
+        this.transientStorePool.destroy();
+    }
+
+    public void destroy() {
+        this.destroyLogics();
+        this.commitLog.destroy();
+        this.indexService.destroy();
+        this.deleteFile(StorePathConfigHelper.getAbortFile(this.messageStoreConfig.getStorePathRootDir()));
+        this.deleteFile(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));
+    }
+
+    public void destroyLogics() {
+        for (ConcurrentHashMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) {
+            for (ConsumeQueue logic : maps.values()) {
+                logic.destroy();
+            }
+        }
+    }
+
+    public PutMessageResult putMessage(MessageExtBrokerInner msg) {
+        if (this.shutdown) {
+            log.warn("message store has shutdown, so putMessage is forbidden");
+            return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
+        }
+
+        if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
+            long value = this.printTimes.getAndIncrement();
+            if ((value % 50000) == 0) {
+                log.warn("message store is slave mode, so putMessage is forbidden ");
+            }
+
+            return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
+        }
+
+        if (!this.runningFlags.isWriteable()) {
+            long value = this.printTimes.getAndIncrement();
+            if ((value % 50000) == 0) {
+                log.warn("message store is not writeable, so putMessage is forbidden " + this.runningFlags.getFlagBits());
+            }
+
+            return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
+        } else {
+            this.printTimes.set(0);
+        }
+
+
+        if (msg.getTopic().length() > Byte.MAX_VALUE) {
+            log.warn("putMessage message topic length too long " + msg.getTopic().length());
+            return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
+        }
+
+
+        if (msg.getPropertiesString() != null && msg.getPropertiesString().length() > Short.MAX_VALUE) {
+            log.warn("putMessage message properties length too long " + msg.getPropertiesString().length());
+            return new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, null);
+        }
+
+
+        if (this.isOSPageCacheBusy()) {
+            return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null);
+        }
+
+        long beginTime = this.getSystemClock().now();
+        PutMessageResult result = this.commitLog.putMessage(msg);
+
+        long eclipseTime = this.getSystemClock().now() - beginTime;
+        if (eclipseTime > 500) {
+            log.warn("putMessage not in lock eclipse time(ms)={}, bodyLength={}", eclipseTime, msg.getBody().length);
+        }
+        this.storeStatsService.setPutMessageEntireTimeMax(eclipseTime);
+
+        if (null == result || !result.isOk()) {
+            this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();
+        }
+
+        return result;
+    }
+
+    @Override
+    public boolean isOSPageCacheBusy() {
+        long begin = this.getCommitLog().getBeginTimeInLock();
+        long diff = this.systemClock.now() - begin;
+
+        if (diff < 10000000 //
+                && diff > this.messageStoreConfig.getOsPageCacheBusyTimeOutMills()) {
+            return true;
+        }
+
+        return false;
+    }
+
+    @Override
+    public long lockTimeMills() {
+        return this.commitLog.lockTimeMills();
+    }
+
+    public SystemClock getSystemClock() {
+        return systemClock;
+    }
+
+    public CommitLog getCommitLog() {
+        return commitLog;
+    }
+
+    public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset, final int maxMsgNums,
+                                       final SubscriptionData subscriptionData) {
+        if (this.shutdown) {
+            log.warn("message store has shutdown, so getMessage is forbidden");
+            return null;
+        }
+
+        if (!this.runningFlags.isReadable()) {
+            log.warn("message store is not readable, so getMessage is forbidden " + this.runningFlags.getFlagBits());
+            return null;
+        }
+
+        long beginTime = this.getSystemClock().now();
+
+
+        GetMessageStatus status = GetMessageStatus.NO_MESSAGE_IN_QUEUE;
+        long nextBeginOffset = offset;
+        long minOffset = 0;
+        long maxOffset = 0;
+
+        GetMessageResult getResult = new GetMessageResult();
+
+
+        final long maxOffsetPy = this.commitLog.getMaxOffset();
+
+        ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId);
+        if (consumeQueue != null) {
+            minOffset = consumeQueue.getMinOffsetInQuque();
+            maxOffset = consumeQueue.getMaxOffsetInQuque();
+
+            if (maxOffset == 0) {
+                status = GetMessageStatus.NO_MESSAGE_IN_QUEUE;
+                nextBeginOffset = nextOffsetCorrection(offset, 0);
+            } else if (offset < minOffset) {
+                status = GetMessageStatus.OFFSET_TOO_SMALL;
+                nextBeginOffset = nextOffsetCorrection(offset, minOffset);
+            } else if (offset == maxOffset) {
+                status = GetMessageStatus.OFFSET_OVERFLOW_ONE;
+                nextBeginOffset = nextOffsetCorrection(offset, offset);
+            } else if (offset > maxOffset) {
+                status = GetMessageStatus.OFFSET_OVERFLOW_BADLY;
+                if (0 == minOffset) {
+                    nextBeginOffset = nextOffsetCorrection(offset, minOffset);
+                } else {
+                    nextBeginOffset = nextOffsetCorrection(offset, maxOffset);
+                }
+            } else {
+                SelectMappedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(offset);
+                if (bufferConsumeQueue != null) {
+                    try {
+                        status = GetMessageStatus.NO_MATCHED_MESSAGE;
+
+                        long nextPhyFileStartOffset = Long.MIN_VALUE;
+                        long maxPhyOffsetPulling = 0;
+
+                        int i = 0;
+                        final int maxFilterMessageCount = 16000;
+                        final boolean diskFallRecorded = this.messageStoreConfig.isDiskFallRecorded();
+                        for (; i < bufferConsumeQueue.getSize() && i < maxFilterMessageCount; i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
+                            long offsetPy = bufferConsumeQueue.getByteBuffer().getLong();
+                            int sizePy = bufferConsumeQueue.getByteBuffer().getInt();
+                            long tagsCode = bufferConsumeQueue.getByteBuffer().getLong();
+
+                            maxPhyOffsetPulling = offsetPy;
+
+
+                            if (nextPhyFileStartOffset != Long.MIN_VALUE) {
+                                if (offsetPy < nextPhyFileStartOffset)
+                                    continue;
+                            }
+
+
+                            boolean isInDisk = checkInDiskByCommitOffset(offsetPy, maxOffsetPy);
+
+                            if (this.isTheBatchFull(sizePy, maxMsgNums, getResult.getBufferTotalSize(), getResult.getMessageCount(),
+                                    isInDisk)) {
+                                break;
+                            }
+
+
+                            if (this.messageFilter.isMessageMatched(subscriptionData, tagsCode)) {
+                                SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy);
+                                if (selectResult != null) {
+                                    this.storeStatsService.getGetMessageTransferedMsgCount().incrementAndGet();
+                                    getResult.addMessage(selectResult);
+                                    status = GetMessageStatus.FOUND;
+                                    nextPhyFileStartOffset = Long.MIN_VALUE;
+                                } else {
+                                    if (getResult.getBufferTotalSize() == 0) {
+                                        status = GetMessageStatus.MESSAGE_WAS_REMOVING;
+                                    }
+
+
+                                    nextPhyFileStartOffset = this.commitLog.rollNextFile(offsetPy);
+                                }
+                            } else {
+                                if (getResult.getBufferTotalSize() == 0) {
+                                    status = GetMessageStatus.NO_MATCHED_MESSAGE;
+                                }
+
+                                if (log.isDebugEnabled()) {
+                                    log.debug("message type not matched, client: " + subscriptionData + " server: " + tagsCode);
+                                }
+                            }
+                        }
+
+
+                        if (diskFallRecorded) {
+                            long fallBehind = maxOffsetPy - maxPhyOffsetPulling;
+                            brokerStatsManager.recordDiskFallBehindSize(group, topic, queueId, fallBehind);
+                        }
+
+                        nextBeginOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
+
+
+                        long diff = maxOffsetPy - maxPhyOffsetPulling;
+                        long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE
+                                * (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0));
+                        getResult.setSuggestPullingFromSlave(diff > memory);
+                    } finally {
+
+                        bufferConsumeQueue.release();
+                    }
+                } else {
+                    status = GetMessageStatus.OFFSET_FOUND_NULL;
+                    nextBeginOffset = nextOffsetCorrection(offset, consumeQueue.rollNextFile(offset));
+                    log.warn("consumer request topic: " + topic + "offset: " + offset + " minOffset: " + minOffset + " maxOffset: "
+                            + maxOffset + ", but access logic queue failed.");
+                }
+            }
+        } else {
+            status = GetMessageStatus.NO_MATCHED_LOGIC_QUEUE;
+            nextBeginOffset = nextOffsetCorrection(offset, 0);
+        }
+
+        if (GetMessageStatus.FOUND == status) {
+            this.storeStatsService.getGetMessageTimesTotalFound().incrementAndGet();
+        } else {
+            this.storeStatsService.getGetMessageTimesTotalMiss().incrementAndGet();
+        }
+        long eclipseTime = this.getSystemClock().now() - beginTime;
+        this.storeStatsService.setGetMessageEntireTimeMax(eclipseTime);
+
+        getResult.setStatus(status);
+        getResult.setNextBeginOffset(nextBeginOffset);
+        getResult.setMaxOffset(maxOffset);
+        getResult.setMinOffset(minOffset);
+        return getResult;
+    }
+
+    /**
+
+     */
+    public long getMaxOffsetInQuque(String topic, int queueId) {
+        ConsumeQueue logic = this.findConsumeQueue(topic, queueId);
+        if (logic != null) {
+            long offset = logic.getMaxOffsetInQuque();
+            return offset;
+        }
+
+        return 0;
+    }
+
+    /**
+
+     */
+    public long getMinOffsetInQuque(String topic, int queueId) {
+        ConsumeQueue logic = this.findConsumeQueue(topic, queueId);
+        if (logic != null) {
+            return logic.getMinOffsetInQuque();
+        }
+
+        return -1;
+    }
+
+    @Override
+    public long getCommitLogOffsetInQueue(String topic, int queueId, long cqOffset) {
+        ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId);
+        if (consumeQueue != null) {
+            SelectMappedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(cqOffset);
+            if (bufferConsumeQueue != null) {
+                try {
+                    long offsetPy = bufferConsumeQueue.getByteBuffer().getLong();
+                    return offsetPy;
+                } finally {
+                    bufferConsumeQueue.release();
+                }
+            }
+        }
+
+        return 0;
+    }
+
+    public long getOffsetInQueueByTime(String topic, int queueId, long timestamp) {
+        ConsumeQueue logic = this.findConsumeQueue(topic, queueId);
+        if (logic != null) {
+            return logic.getOffsetInQueueByTime(timestamp);
+        }
+
+        return 0;
+    }
+
+    public MessageExt lookMessageByOffset(long commitLogOffset) {
+        SelectMappedBufferResult sbr = this.commitLog.getMessage(commitLogOffset, 4);
+        if (null != sbr) {
+            try {
+                // 1 TOTALSIZE
+                int size = sbr.getByteBuffer().getInt();
+                return lookMessageByOffset(commitLogOffset, size);
+            } finally {
+                sbr.release();
+            }
+        }
+
+        return null;
+    }
+
+    @Override
+    public SelectMappedBufferResult selectOneMessageByOffset(long commitLogOffset) {
+        SelectMappedBufferResult sbr = this.commitLog.getMessage(commitLogOffset, 4);
+        if (null != sbr) {
+            try {
+                // 1 TOTALSIZE
+                int size = sbr.getByteBuffer().getInt();
+                return this.commitLog.getMessage(commitLogOffset, size);
+            } finally {
+                sbr.release();
+            }
+        }
+
+        return null;
+    }
+
+    @Override
+    public SelectMappedBufferResult selectOneMessageByOffset(long commitLogOffset, int msgSize) {
+        return this.commitLog.getMessage(commitLogOffset, msgSize);
+    }
+
+    public String getRunningDataInfo() {
+        return this.storeStatsService.toString();
+    }
+
+    @Override
+    public HashMap<String, String> getRuntimeInfo() {
+        HashMap<String, String> result = this.storeStatsService.getRuntimeInfo();
+
+        {
+            String storePathPhysic = DefaultMessageStore.this.getMessageStoreConfig().getStorePathCommitLog();
+            double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic);
+            result.put(RunningStats.commitLogDiskRatio.name(), String.valueOf(physicRatio));
+
+        }
+
+
+        {
+
+            String storePathLogics = StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir());
+            double logicsRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathLogics);
+            result.put(RunningStats.consumeQueueDiskRatio.name(), String.valueOf(logicsRatio));
+        }
+
+
+        {
+            if (this.scheduleMessageService != null) {
+                this.scheduleMessageService.buildRunningStats(result);
+            }
+        }
+
+        result.put(RunningStats.commitLogMinOffset.name(), String.valueOf(DefaultMessageStore.this.getMinPhyOffset()));
+        result.put(RunningStats.commitLogMaxOffset.name(), String.valueOf(DefaultMessageStore.this.getMaxPhyOffset()));
+
+        return result;
+    }
+
+    @Override
+    public long getMaxPhyOffset() {
+        return this.commitLog.getMaxOffset();
+    }
+
+    @Override
+    public long getMinPhyOffset() {
+        return this.commitLog.getMinOffset();
+    }
+
+    @Override
+    public long getEarliestMessageTime(String topic, int queueId) {
+        ConsumeQueue logicQueue = this.findConsumeQueue(topic, queueId);
+        if (logicQueue != null) {
+            long minLogicOffset = logicQueue.getMinLogicOffset();
+
+            SelectMappedBufferResult result = logicQueue.getIndexBuffer(minLogicOffset / ConsumeQueue.CQ_STORE_UNIT_SIZE);
+            if (result != null) {
+                try {
+                    final long phyOffset = result.getByteBuffer().getLong();
+                    final int size = result.getByteBuffer().getInt();
+                    long storeTime = this.getCommitLog().pickupStoreTimestamp(phyOffset, size);
+                    return storeTime;
+                } catch (Exception e) {
+                } finally {
+                    result.release();
+                }
+            }
+        }
+
+        return -1;
+    }
+
+    @Override
+    public long getEarliestMessageTime() {
+        final long minPhyOffset = this.getMinPhyOffset();
+        final int size = this.messageStoreConfig.getMaxMessageSize() * 2;
+        return this.getCommitLog().pickupStoreTimestamp(minPhyOffset, size);
+    }
+
+    @Override
+    public long getMessageStoreTimeStamp(String topic, int queueId, long offset) {
+        ConsumeQueue logicQueue = this.findConsumeQueue(topic, queueId);
+        if (logicQueue != null) {
+            SelectMappedBufferResult result = logicQueue.getIndexBuffer(offset);
+            if (result != null) {
+                try {
+                    final long phyOffset = result.getByteBuffer().getLong();
+                    final int size = result.getByteBuffer().getInt();
+                    long storeTime = this.getCommitLog().pickupStoreTimestamp(phyOffset, size);
+                    return storeTime;
+                } catch (Exception e) {
+                } finally {
+                    result.release();
+                }
+            }
+        }
+
+        return -1;
+    }
+
+    @Override
+    public long getMessageTotalInQueue(String topic, int queueId) {
+        ConsumeQueue logicQueue = this.findConsumeQueue(topic, queueId);
+        if (logicQueue != null) {
+            return logicQueue.getMessageTotalInQueue();
+        }
+
+        return -1;
+    }
+
+    @Override
+    public SelectMappedBufferResult getCommitLogData(final long offset) {
+        if (this.shutdown) {
+            log.warn("message store has shutdown, so getPhyQueueData is forbidden");
+            return null;
+        }
+
+        return this.commitLog.getData(offset);
+    }
+
+    @Override
+    public boolean appendToCommitLog(long startOffset, byte[] data) {
+        if (this.shutdown) {
+            log.warn("message store has shutdown, so appendToPhyQueue is forbidden");
+            return false;
+        }
+
+        boolean result = this.commitLog.appendData(startOffset, data);
+        if (result) {
+            this.reputMessageService.wakeup();
+        } else {
+            log.error("appendToPhyQueue failed " + startOffset + " " + data.length);
+        }
+
+        return result;
+    }
+
+    @Override
+    public void excuteDeleteFilesManualy() {
+        this.cleanCommitLogService.excuteDeleteFilesManualy();
+    }
+
+    @Override
+    public QueryMessageResult queryMessage(String topic, String key, int maxNum, long begin, long end) {
+        QueryMessageResult queryMessageResult = new QueryMessageResult();
+
+        long lastQueryMsgTime = end;
+
+        for (int i = 0; i < 3; i++) {
+            QueryOffsetResult queryOffsetResult = this.indexService.queryOffset(topic, key, maxNum, begin, lastQueryMsgTime);
+            if (queryOffsetResult.getPhyOffsets().isEmpty()) {
+                break;
+            }
+
+
+            Collections.sort(queryOffsetResult.getPhyOffsets());
+
+            queryMessageResult.setIndexLastUpdatePhyoffset(queryOffsetResult.getIndexLastUpdatePhyoffset());
+            queryMessageResult.setIndexLastUpdateTimestamp(queryOffsetResult.getIndexLastUpdateTimestamp());
+
+            for (int m = 0; m < queryOffsetResult.getPhyOffsets().size(); m++) {
+                long offset = queryOffsetResult.getPhyOffsets().get(m);
+
+                try {
+
+                    boolean match = true;
+                    MessageExt msg = this.lookMessageByOffset(offset);
+                    if (0 == m) {
+                        lastQueryMsgTime = msg.getStoreTimestamp();
+                    }
+
+//                    String[] keyArray = msg.getKeys().split(MessageConst.KEY_SEPARATOR);
+//                    if (topic.equals(msg.getTopic())) {
+//                        for (String k : keyArray) {
+//                            if (k.equals(key)) {
+//                                match = true;
+//                                break;
+//                            }
+//                        }
+//                    }
+
+                    if (match) {
+                        SelectMappedBufferResult result = this.commitLog.getData(offset, false);
+                        if (result != null) {
+                            int size = result.getByteBuffer().getInt(0);
+                            result.getByteBuffer().limit(size);
+                            result.setSize(size);
+                            queryMessageResult.addMessage(result);
+                        }
+                    } else {
+                        log.warn("queryMessage hash duplicate, {} {}", topic, key);
+                    }
+                } catch (Exception e) {
+                    log.error("queryMessage exception", e);
+                }
+            }
+
+
+            if (queryMessageResult.getBufferTotalSize() > 0) {
+                break;
+            }
+
+
+            if (lastQueryMsgTime < begin) {
+                break;
+            }
+        }
+
+        return queryMessageResult;
+    }
+
+    @Override
+    public void updateHaMasterAddress(String newAddr) {
+        this.haService.updateMasterAddress(newAddr);
+    }
+
+    @Override
+    public long slaveFallBehindMuch() {
+        return this.commitLog.getMaxOffset() - this.haService.getPush2SlaveMaxOffset().get();
+    }
+
+    @Override
+    public long now() {
+        return this.systemClock.now();
+    }
+
+    @Override
+    public int cleanUnusedTopic(Set<String> topics) {
+        Iterator<Entry<String, ConcurrentHashMap<Integer, ConsumeQueue>>> it = this.consumeQueueTable.entrySet().iterator();
+        while (it.hasNext()) {
+            Entry<String, ConcurrentHashMap<Integer, ConsumeQueue>> next = it.next();
+            String topic = next.getKey();
+
+            if (!topics.contains(topic) && !topic.equals(ScheduleMessageService.SCHEDULE_TOPIC)) {
+                ConcurrentHashMap<Integer, ConsumeQueue> queueTable = next.getValue();
+                for (ConsumeQueue cq : queueTable.values()) {
+                    cq.destroy();
+                    log.info("cleanUnusedTopic: {} {} ConsumeQueue cleaned", //
+                            cq.getTopic(), //
+                            cq.getQueueId() //
+                    );
+
+                    this.commitLog.removeQueueFromTopicQueueTable(cq.getTopic(), cq.getQueueId());
+                }
+                it.remove();
+
+                log.info("cleanUnusedTopic: {},topic destroyed", topic);
+            }
+        }
+
+        return 0;
+    }
+
+    public void cleanExpiredConsumerQueue() {
+        long minCommitLogOffset = this.commitLog.getMinOffset();
+
+        Iterator<Entry<String, ConcurrentHashMap<Integer, ConsumeQueue>>> it = this.consumeQueueTable.entrySet().iterator();
+        while (it.hasNext()) {
+            Entry<String, ConcurrentHashMap<Integer, ConsumeQueue>> next = it.next();
+            String topic = next.getKey();
+            if (!topic.equals(ScheduleMessageService.SCHEDULE_TOPIC)) {
+                ConcurrentHashMap<Integer, ConsumeQueue> queueTable = next.getValue();
+                Iterator<Entry<Integer, ConsumeQueue>> itQT = queueTable.entrySet().iterator();
+                while (itQT.hasNext()) {
+                    Entry<Integer, ConsumeQueue> nextQT = itQT.next();
+                    long maxCLOffsetInConsumeQueue = nextQT.getValue().getLastOffset();
+
+
+                    if (maxCLOffsetInConsumeQueue == -1) {
+                        log.warn("maybe ConsumeQueue was created just now. topic={} queueId={} maxPhysicOffset={} minLogicOffset={}.", //
+                                nextQT.getValue().getTopic(), //
+                                nextQT.getValue().getQueueId(), //
+                                nextQT.getValue().getMaxPhysicOffset(), //
+                                nextQT.getValue().getMinLogicOffset());
+                    } else if (maxCLOffsetInConsumeQueue < minCommitLogOffset) {
+                        log.info(
+                                "cleanExpiredConsumerQueue: {} {} consumer queue destroyed, minCommitLogOffset: {} maxCLOffsetInConsumeQueue: {}", //
+                                topic, //
+                                nextQT.getKey(), //
+                                minCommitLogOffset, //
+                                maxCLOffsetInConsumeQueue);
+
+                        DefaultMessageStore.this.commitLog.removeQueueFromTopicQueueTable(nextQT.getValue().getTopic(),
+                                nextQT.getValue().getQueueId());
+
+                        nextQT.getValue().destroy();
+                        itQT.remove();
+                    }
+                }
+
+                if (queueTable.isEmpty()) {
+                    log.info("cleanExpiredConsumerQueue: {},topic destroyed", topic);
+                    it.remove();
+                }
+            }
+        }
+    }
+
+    public Map<String, Long> getMessageIds(final String topic, final int queueId, long minOffset, long maxOffset, SocketAddress storeHost) {
+        Map<String, Long> messageIds = new HashMap<String, Long>();
+        if (this.shutdown) {
+            return messageIds;
+        }
+
+        ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId);
+        if (consumeQueue != null) {
+            minOffset = Math.max(minOffset, consumeQueue.getMinOffsetInQuque());
+            maxOffset = Math.min(maxOffset, consumeQueue.getMaxOffsetInQuque());
+
+            if (maxOffset == 0) {
+                return messageIds;
+            }
+
+            long nextOffset = minOffset;
+            while (nextOffset < maxOffset) {
+                SelectMappedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(nextOffset);
+                if (bufferConsumeQueue != null) {
+                    try {
+                        int i = 0;
+                        for (; i < bufferConsumeQueue.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
+                            long offsetPy = bufferConsumeQueue.getByteBuffer().getLong();
+                            final ByteBuffer msgIdMemory = ByteBuffer.allocate(MessageDecoder.MSG_ID_LENGTH);
+                            String msgId =
+                                    MessageDecoder.createMessageId(msgIdMemory, MessageExt.socketAddress2ByteBuffer(storeHost), offsetPy);
+                            messageIds.put(msgId, nextOffset++);
+                            if (nextOffset > maxOffset) {
+                                return messageIds;
+                            }
+                        }
+                    } finally {
+
+                        bufferConsumeQueue.release();
+                    }
+                } else {
+                    return messageIds;
+                }
+            }
+        }
+        return messageIds;
+    }
+
+    @Override
+    public boolean checkInDiskByConsumeOffset(final String topic, final int queueId, long consumeOffset) {
+
+        final long maxOffsetPy = this.commitLog.getMaxOffset();
+
+        ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId);
+        if (consumeQueue != null) {
+            SelectMappedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(consumeOffset);
+            if (bufferConsumeQueue != null) {
+                try {
+                    for (int i = 0; i < bufferConsumeQueue.getSize(); ) {
+                        i += ConsumeQueue.CQ_STORE_UNIT_SIZE;
+                        long offsetPy = bufferConsumeQueue.getByteBuffer().getLong();
+                        return checkInDiskByCommitOffset(offsetPy, maxOffsetPy);
+                    }
+                } finally {
+
+                    bufferConsumeQueue.release();
+                }
+            } else {
+                return false;
+            }
+        }
+        return false;
+    }
+
+    public long dispatchBehindBytes() {
+        return this.reputMessageService.behind();
+    }
+
+    @Override
+    public long flush() {
+        return this.commitLog.flush();
+    }
+
+    @Override
+    public boolean resetWriteOffset(long phyOffset) {
+        return this.commitLog.resetOffset(phyOffset);
+    }
+
+    @Override
+    public long getConfirmOffset() {
+        return this.commitLog.getConfirmOffset();
+    }
+
+    @Override
+    public void setConfirmOffset(long phyOffset) {
+        this.commitLog.setConfirmOffset(phyOffset);
+    }
+
+    public MessageExt lookMessageByOffset(long commitLogOffset, int size) {
+        SelectMappedBufferResult sbr = this.commitLog.getMessage(commitLogOffset, size);
+        if (null != sbr) {
+            try {
+                return MessageDecoder.decode(sbr.getByteBuffer(), true, false);
+            } finally {
+                sbr.release();
+            }
+        }
+
+        return null;
+    }
+
+    public ConsumeQueue findConsumeQueue(String topic, int queueId) {
+        ConcurrentHashMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic);
+        if (null == map) {
+            ConcurrentHashMap<Integer, ConsumeQueue> newMap = new ConcurrentHashMap<Integer, ConsumeQueue>(128);
+            ConcurrentHashMap<Integer, ConsumeQueue> oldMap = consumeQueueTable.putIfAbsent(topic, newMap);
+            if (oldMap != null) {
+                map = oldMap;
+            } else {
+                map = newMap;
+            }
+        }
+
+        ConsumeQueue logic = map.get(queueId);
+        if (null == logic) {
+            ConsumeQueue newLogic = new ConsumeQueue(//
+                    topic, //
+                    queueId, //
+                    StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()), //
+                    this.getMessageStoreConfig().getMapedFileSizeConsumeQueue(), //
+                    this);
+            ConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic);
+            if (oldLogic != null) {
+                logic = oldLogic;
+            } else {
+                logic = newLogic;
+            }
+        }
+
+        return logic;
+    }
+
+    private long nextOffsetCorrection(long oldOffset, long newOffset) {
+        long nextOffset = oldOffset;
+        if (this.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE || this.getMessageStoreConfig().isOffsetCheckInSlave()) {
+            nextOffset = newOffset;
+        }
+        return nextOffset;
+    }
+
+    private boolean checkInDiskByCommitOffset(long offsetPy, long maxOffsetPy) {
+        long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE * (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0));
+        return (maxOffsetPy - offsetPy) > memory;
+    }
+
+    private boolean isTheBatchFull(int sizePy, int maxMsgNums, int bufferTotal, int messageTotal, boolean isInDisk) {
+
+        if (0 == bufferTotal || 0 == messageTotal) {
+            return false;
+        }
+
+        if ((messageTotal + 1) >= maxMsgNums) {
+            return true;
+        }
+
+
+        if (isInDisk) {
+            if ((bufferTotal + sizePy) > this.messageStoreConfig.getMaxTransferBytesOnMessageInDisk()) {
+                return true;
+            }
+
+            if ((messageTotal + 1) > this.messageStoreConfig.getMaxTransferCountOnMessageInDisk()) {
+                return true;
+            }
+        } else {
+            if ((bufferTotal + sizePy) > this.messageStoreConfig.getMaxTransferBytesOnMessageInMemory()) {
+                return true;
+            }
+
+            if ((messageTotal + 1) > this.messageStoreConfig.getMaxTransferCountOnMessageInMemory()) {
+                return true;
+            }
+        }
+
+        return false;
+    }
+
+    private void deleteFile(final String fileName) {
+        File file = new File(fileName);
+        boolean result = file.delete();
+        log.info(fileName + (result ? " delete OK" : " delete Failed"));
+    }
+
+    /**
+     * @throws IOException
+     */
+    private void createTempFile() throws IOException {
+        String fileName = StorePathConfigHelper.getAbortFile(this.messageStoreConfig.getStorePathRootDir());
+        File file = new File(fileName);
+        MappedFile.ensureDirOK(file.getParent());
+        boolean result = file.createNewFile();
+        log.info(fileName + (result ? " create OK" : " already exists"));
+    }
+
+    private void addScheduleTask() {
+
+        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+            @Override
+            public void run() {
+                DefaultMessageStore.this.cleanFilesPeriodically();
+            }
+        }, 1000 * 60, this.messageStoreConfig.getCleanResourceInterval(), TimeUnit.MILLISECONDS);
+
+
+        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+            @Override
+            public void run() {
+                DefaultMessageStore.this.checkSelf();
+            }
+        }, 1, 10, TimeUnit.MINUTES);
+
+
+        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+            @Override
+            public void run() {
+                if (DefaultMessageStore.this.getMessageStoreConfig().isDebugLockEnable()) {
+                    try {
+                        if (DefaultMessageStore.this.commitLog.getBeginTimeInLock() != 0) {
+                            long lockTime = System.currentTimeMillis() - DefaultMessageStore.this.commitLog.getBeginTimeInLock();
+                            if (lockTime > 1000 && lockTime < 10000000) {
+
+                                String stack = UtilAll.jstack();
+                                final String fileName = System.getProperty("user.home") + File.separator + "debug/lock/stack-"
+                                        + DefaultMessageStore.this.commitLog.getBeginTimeInLock() + "-" + lockTime;
+                                MixAll.string2FileNotSafe(stack, fileName);
+                            }
+                        }
+                    } catch (Exception e) {
+                    }
+                }
+            }
+        }, 1, 1, TimeUnit.SECONDS);
+
+        // this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+        // @Override
+        // public void run() {
+        // DefaultMessageStore.this.cleanExpiredConsumerQueue();
+        // }
+        // }, 1, 1, TimeUnit.HOURS);
+    }
+
+    private void cleanFilesPeriodically() {
+        this.cleanCommitLogService.run();
+        this.cleanConsumeQueueService.run();
+    }
+
+    private void checkSelf() {
+        this.commitLog.checkSelf();
+
+        Iterator<Entry<String, ConcurrentHashMap<Integer, ConsumeQueue>>> it = this.consumeQueueTable.entrySet().iterator();
+        while (it.hasNext()) {
+            Entry<String, ConcurrentHashMap<Integer, ConsumeQueue>> next = it.next();
+            Iterator<Entry<Integer, ConsumeQueue>> itNext = next.getValue().entrySet().iterator();
+            while (itNext.hasNext()) {
+                Entry<Integer, ConsumeQueue> cq = itNext.next();
+                cq.getValue().checkSelf();
+            }
+        }
+    }
+
+    private boolean isTempFileExist() {
+        String fileName = StorePathConfigHelper.getAbortFile(this.messageStoreConfig.getStorePathRootDir());
+        File file = new File(fileName);
+        return file.exists();
+    }
+
+    private boolean loadConsumeQueue() {
+        File dirLogic = new File(StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()));
+        File[] fileTopicList = dirLogic.listFiles();
+        if (fileTopicList != null) {
+
+            for (File fileTopic : fileTopicList) {
+                String topic = fileTopic.getName();
+
+                File[] fileQueueIdList = fileTopic.listFiles();
+                if (fileQueueIdList != null) {
+                    for (File fileQueueId : fileQueueIdList) {
+                        int queueId;
+                        try {
+                            queueId = Integer.parseInt(fileQueueId.getName());
+                        } catch (NumberFormatException e) {
+                            continue;
+                        }
+                        ConsumeQueue logic = new ConsumeQueue(
+                                topic,
+                                queueId,
+                                StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()),
+                                this.getMessageStoreConfig().getMapedFileSizeConsumeQueue(),
+                                this);
+                        this.putConsumeQueue(topic, queueId, logic);
+                        if (!logic.load()) {
+                            return false;
+                        }
+                    }
+                }
+            }
+        }
+
+        log.info("load logics queue all over, OK");
+
+        return true;
+    }
+
+    private void recover(final boolean lastExitOK) {
+        this.recoverConsumeQueue();
+
+
+        if (lastExitOK) {
+            this.commitLog.recoverNormally();
+        } else {
+            this.commitLog.recoverAbnormally();
+        }
+
+        this.recoverTopicQueueTable();
+    }
+
+    public MessageStoreConfig getMessageStoreConfig() {
+        return messageStoreConfig;
+    }
+
+    public TransientStorePool getTransientStorePool() {
+        return transientStorePool;
+    }
+
+    private void putConsumeQueue(final String topic, final int queueId, final ConsumeQueue consumeQueue) {
+        ConcurrentHashMap<Integer/* queueId */, ConsumeQueue> map = this.consumeQueueTable.get(topic);
+        if (null == map) {
+            map = new ConcurrentHashMap<Integer/* queueId */, ConsumeQueue>();
+            map.put(queueId, consumeQueue);
+            this.consumeQueueTable.put(topic, map);
+        } else {
+            map.put(queueId, consumeQueue);
+        }
+    }
+
+    private void recoverConsumeQueue() {
+        for (ConcurrentHashMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) {
+            for (ConsumeQueue logic : maps.values()) {
+                logic.recover();
+            }
+        }
+    }
+
+    private void recoverTopicQueueTable() {
+        HashMap<String/* topic-queueid */, Long/* offset */> table = new HashMap<String, Long>(1024);
+        long minPhyOffset = this.commitLog.getMinOffset();
+        for (ConcurrentHashMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) {
+            for (ConsumeQueue logic : maps.values()) {
+                String key = logic.getTopic() + "-" + logic.getQueueId();
+                table.put(key, logic.getMaxOffsetInQuque());
+                logic.correctMinOffset(minPhyOffset);
+            }
+        }
+
+        this.commitLog.setTopicQueueTable(table);
+    }
+
+    public AllocateMappedFileService getAllocateMappedFileService() {
+        return allocateMappedFileService;
+    }
+
+    public StoreStatsService getStoreStatsService() {
+        return storeStatsService;
+    }
+
+    public RunningFlags getAccessRights() {
+        return runningFlags;
+    }
+
+    public ConcurrentHashMap<String, ConcurrentHashMap<Integer, ConsumeQueue>> getConsumeQueueTable() {
+        return consumeQueueTable;
+    }
+
+    public StoreCheckpoint getStoreCheckpoint() {
+        return storeCheckpoint;
+    }
+
+    public HAService getHaService() {
+        return haService;
+    }
+
+    public ScheduleMessageService getScheduleMessageService() {
+        return scheduleMessageService;
+    }
+
+    public RunningFlags getRunningFlags() {
+        return runningFlags;
+    }
+
+    public void doDispatch(DispatchRequest req) {
+        final int tranType = MessageSysFlag.getTransactionValue(req.getSysFlag());
+        switch (tranType) {
+            case MessageSysFlag.TRANSACTION_NOT_TYPE:
+            case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
+                DefaultMessageStore.this.putMessagePositionInfo(req.getTopic(), req.getQueueId(), req.getCommitLogOffset(), req.getMsgSize(),
+                        req.getTagsCode(), req.getStoreTimestamp(), req.getConsumeQueueOffset());
+                break;
+            case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
+            case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
+                break;
+        }
+
+        if (DefaultMessageStore.this.getMessageStoreConfig().isMessageIndexEnable()) {
+            DefaultMessageStore.this.indexService.buildIndex(req);
+        }
+    }
+
+    public void putMessagePositionInfo(String topic, int queueId, long offset, int size, long tagsCode, long storeTimestamp,
+                                       long logicOffset) {
+        ConsumeQueue cq = this.findConsumeQueue(topic, queueId);
+        cq.putMessagePositionInfoWrapper(offset, size, tagsCode, storeTimestamp, logicOffset);
+    }
+
+    public BrokerStatsManager getBrokerStatsManager() {
+        return brokerStatsManager;
+    }
+
+    class CleanCommitLogService {
+
+        private final static int MAX_MANUAL_DELETE_FILE_TIMES = 20;
+        private final double diskSpaceWarningLevelRatio =
+                Double.parseDouble(System.getProperty("rocketmq.broker.diskSpaceWarningLevelRatio", "0.90"));
+
+        private final double diskSpaceCleanForciblyRatio =
+                Double.parseDouble(System.getProperty("rocketmq.broker.diskSpaceCleanForciblyRatio", "0.85"));
+        private long lastRedeleteTimestamp = 0;
+
+        private volatile int manualDeleteFileSeveralTimes = 0;
+
+        private volatile boolean cleanImmediately = false;
+
+
+        public void excuteDeleteFilesManualy() {
+            this.manualDeleteFileSeveralTimes = MAX_MANUAL_DELETE_FILE_TIMES;
+            DefaultMessageStore.log.info("excuteDeleteFilesManualy was invoked");
+        }
+
+
+        public void run() {
+            try {
+                this.deleteExpiredFiles();
+
+                this.redeleteHangedFile();
+            } catch (Exception e) {
+                DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
+            }
+        }
+
+        private void deleteExpiredFiles() {
+            int deleteCount = 0;
+            long fileReservedTime = DefaultMessageStore.this.getMessageStoreConfig().getFileReservedTime();
+            int deletePhysicFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteCommitLogFilesInterval();
+            int destroyMapedFileIntervalForcibly = DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly();
+
+            boolean timeup = this.isTimeToDelete();
+            boolean spacefull = this.isSpaceToDelete();
+            boolean manualDelete = this.manualDeleteFileSeveralTimes > 0;
+
+
+            if (timeup || spacefull || manualDelete) {
+
+                if (manualDelete)
+                    this.manualDeleteFileSeveralTimes--;
+
+
+                boolean cleanAtOnce = DefaultMessageStore.this.getMessageStoreConfig().isCleanFileForciblyEnable() && this.cleanImmediately;
+
+                log.info("begin to delete before {} hours file. timeup: {} spacefull: {} manualDeleteFileSeveralTimes: {} cleanAtOnce: {}", //
+                        fileReservedTime, //
+                        timeup, //
+                        spacefull, //
+                        manualDeleteFileSeveralTimes, //
+                        cleanAtOnce);
+
+
+                fileReservedTime *= 60 * 60 * 1000;
+
+                deleteCount = DefaultMessageStore.this.commitLog.deleteExpiredFile(fileReservedTime, deletePhysicFilesInterval,
+                        destroyMapedFileIntervalForcibly, cleanAtOnce);
+                if (deleteCount > 0) {
+                } else if (spacefull) {
+                    log.warn("disk space will be full soon, but delete file failed.");
+                }
+            }
+        }
+
+        private void redeleteHangedFile() {
+            int interval = DefaultMessageStore.this.getMessageStoreConfig().getRedeleteHangedFileInterval();
+            long currentTimestamp = System.currentTimeMillis();
+            if ((currentTimestamp - this.lastRedeleteTimestamp) > interval) {
+                this.lastRedeleteTimestamp = currentTimestamp;
+                int destroyMapedFileIntervalForcibly =
+                        DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly();
+                if (DefaultMessageStore.this.commitLog.retryDeleteFirstFile(destroyMapedFileIntervalForcibly)) {
+                }
+            }
+        }
+
+        public String getServiceName() {
+            return CleanCommitLogService.class.getSimpleName();
+        }
+
+        private boolean isTimeToDelete() {
+            String when = DefaultMessageStore.this.getMessageStoreConfig().getDeleteWhen();
+            if (UtilAll.isItTimeToDo(when)) {
+                DefaultMessageStore.log.info("it's time to reclaim disk space, " + when);
+                return true;
+            }
+
+            return false;
+        }
+
+        private boolean isSpaceToDelete() {
+            double ratio = DefaultMessageStore.this.getMessageStoreConfig().getDiskMaxUsedSpaceRatio() / 100.0;
+
+            cleanImmediately = false;
+
+
+            {
+                String storePathPhysic = DefaultMessageStore.this.getMessageStoreConfig().getStorePathCommitLog();
+                double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic);
+                if (physicRatio > diskSpaceWarningLevelRatio) {
+                    boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskFull();
+                    if (diskok) {
+                        DefaultMessageStore.log.error("physic disk maybe full soon " + physicRatio + ", so mark disk full");
+                    }
+
+                    cleanImmediately = true;
+                } else if (physicRatio > diskSpaceCleanForciblyRatio) {
+                    cleanImmediately = true;
+                } else {
+                    boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskOK();
+                    if (!diskok) {
+                        DefaultMessageStore.log.info("physic disk space OK " + physicRatio + ", so mark disk ok");
+                    }
+                }
+
+                if (physicRatio < 0 || physicRatio > ratio) {
+                    DefaultMessageStore.log.info("physic disk maybe full soon, so reclaim space, " + physicRatio);
+                    return true;
+                }
+            }
+
+
+            {
+                String storePathLogics = StorePathConfigHelper
+                        .getStorePathConsumeQueue(DefaultMessageStore.this.getMessageStoreConfig().getStorePathRootDir());
+                double logicsRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathLogics);
+                if (logicsRatio > diskSpaceWarningLevelRatio) {
+                    boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskFull();
+                    if (diskok) {
+                        DefaultMessageStore.log.error("logics disk maybe full soon " + logicsRatio + ", so mark disk full");
+                    }
+
+                    cleanImmediately = true;
+                } else if (logicsRatio > diskSpaceCleanForciblyRatio) {
+                    cleanImmediately = true;
+                } else {
+                    boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskOK();
+                    if (!diskok) {
+                        DefaultMessageStore.log.info("logics disk space OK " + logicsRatio + ", so mark disk ok");
+                    }
+                }
+
+                if (logicsRatio < 0 || logicsRatio > ratio) {
+                    DefaultMessageStore.log.info("logics disk maybe full soon, so reclaim space, " + logicsRatio);
+                    return true;
+                }
+            }
+
+            return false;
+        }
+
+        public int getManualDeleteFileSeveralTimes() {
+            return manualDeleteFileSeveralTimes;
+        }
+
+        public void setManualDeleteFileSeveralTimes(int manualDeleteFileSeveralTimes) {
+            this.manualDeleteFileSeveralTimes = manualDeleteFileSeveralTimes;
+        }
+    }
+
+    class CleanConsumeQueueService {
+        private long lastPhysicalMinOffset = 0;
+
+        public void run() {
+            try {
+                this.deleteExpiredFiles();
+            } catch (Exception e) {
+                DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
+            }
+        }
+
+        private void deleteExpiredFiles() {
+            int deleteLogicsFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteConsumeQueueFilesInterval();
+
+            long minOffset = DefaultMessageStore.this.commitLog.getMinOffset();
+            if (minOffset > this.lastPhysicalMinOffset) {
+                this.lastPhysicalMinOffset = minOffset;
+
+
+                ConcurrentHashMap<String, ConcurrentHashMap<Integer, ConsumeQueue>> tables = DefaultMessageStore.this.consumeQueueTable;
+
+                for (ConcurrentHashMap<Integer, ConsumeQueue> maps : tables.values()) {
+                    for (ConsumeQueue logic : maps.values()) {
+                        int deleteCount = logic.deleteExpiredFile(minOffset);
+
+                        if (deleteCount > 0 && deleteLogicsFilesInterval > 0) {
+                            try {
+                                Thread.sleep(deleteLogicsFilesInterval);
+                            } catch (InterruptedException e) {
+                            }
+                        }
+                    }
+                }
+
+
+                DefaultMessageStore.this.indexService.deleteExpiredFile(minOffset);
+            }
+        }
+
+        public String getServiceName() {
+            return CleanConsumeQueueService.class.getSimpleName();
+        }
+    }
+
+    class FlushConsumeQueueService extends ServiceThread {
+        private static final int RETRY_TIMES_OVER = 3;
+        private long lastFlushTimestamp = 0;
+
+
+        private void doFlush(int retryTimes) {
+            int flushConsumeQueueLeastPages = DefaultMessageStore.this.getMessageStoreConfig().getFlushConsumeQueueLeastPages();
+
+            if (retryTimes == RETRY_TIMES_OVER) {
+                flushConsumeQueueLeastPages = 0;
+            }
+
+            long logicsMsgTimestamp = 0;
+
+
+            int flushConsumeQueueThoroughInterval = DefaultMessageStore.this.getMessageStoreConfig().getFlushConsumeQueueThoroughInterval();
+            long currentTimeMillis = System.currentTimeMillis();
+            if (currentTimeMillis >= (this.lastFlushTimestamp + flushConsumeQueueThoroughInterval)) {
+                this.lastFlushTimestamp = currentTimeMillis;
+                flushConsumeQueueLeastPages = 0;
+                logicsMsgTimestamp = DefaultMessageStore.this.getStoreCheckpoint().getLogicsMsgTimestamp();
+            }
+
+            ConcurrentHashMap<String, ConcurrentHashMap<Integer, ConsumeQueue>> tables = DefaultMessageStore.this.consumeQueueTable;
+
+            for (ConcurrentHashMap<Integer, ConsumeQueue> maps : tables.values()) {
+                for (ConsumeQueue cq : maps.values()) {
+                    boolean result = false;
+                    for (int i = 0; i < retryTimes && !result; i++) {
+                        result = cq.flush(flushConsumeQueueLeastPages);
+                    }
+                }
+            }
+
+            if (0 == flushConsumeQueueLeastPages) {
+                if (logicsMsgTimestamp > 0) {
+                    DefaultMessageStore.this.getStoreCheckpoint().setLogicsMsgTimestamp(logicsMsgTimestamp);
+                }
+                DefaultMessageStore.this.getStoreCheckpoint().flush();
+            }
+        }
+
+
+        public void run() {
+            DefaultMessageStore.log.info(this.getServiceName() + " service started");
+
+            while (!this.isStopped()) {
+                try {
+                    int interval = DefaultMessageStore.this.getMessageStoreConfig().getFlushIntervalConsumeQueue();
+                    this.waitForRunning(interval);
+                    this.doFlush(1);
+                } catch (Exception e) {
+                    DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
+                }
+            }
+
+
+            this.doFlush(RETRY_TIMES_OVER);
+
+            DefaultMessageStore.log.info(this.getServiceName() + " service end");
+        }
+
+
+        @Override
+        public String getServiceName() {
+            return FlushConsumeQueueService.class.getSimpleName();
+        }
+
+
+        @Override
+        public long getJointime() {
+            return 1000 * 60;
+        }
+    }
+
+    class ReputMessageService extends ServiceThread {
+
+        private volatile long reputFromOffset = 0;
+
+        public long getReputFromOffset() {
+            return reputFromOffset;
+        }
+
+        @Override
+        public void shutdown() {
+            for (int i = 0; i < 50 && this.isCommitLogAvailable(); i++) {
+                try {
+                    Thread.sleep(100);
+                } catch (InterruptedException e) {
+                }
+            }
+
+            if (this.isCommitLogAvailable()) {
+                log.warn("shutdown ReputMessageService, but commitlog have not finish to be dispatched, CL: {} reputFromOffset: {}",
+                        DefaultMessageStore.this.commitLog.getMaxOffset(), this.reputFromOffset);
+            }
+
+            super.shutdown();
+        }
+
+        public void setReputFromOffset(long reputFromOffset) {
+            this.reputFromOffset = reputFromOffset;
+        }
+
+        public long behind() {
+            return DefaultMessageStore.this.commitLog.getMaxOffset() - this.reputFromOffset;
+        }
+
+
+        private boolean isCommitLogAvailable() {
+            return this.reputFromOffset < DefaultMessageStore.this.commitLog.getMaxOffset();
+        }
+
+
+        private void doReput() {
+            for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {
+
+                if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable() //
+                        && this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) {
+                    break;
+                }
+
+                SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);
+                if (result != null) {
+                    try {
+                        this.reputFromOffset = result.getStartOffset();
+
+                        for (int readSize = 0; readSize < result.getSize() && doNext; ) {
+                            DispatchRequest dispatchRequest =
+                                    DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);
+                            int size = dispatchRequest.getMsgSize();
+
+                            if (dispatchRequest.isSuccess()) {
+                                if (size > 0) {
+                                    DefaultMessageStore.this.doDispatch(dispatchRequest);
+
+                                    if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole()
+                                            && DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) {
+                                        DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
+                                                dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
+                                                dispatchRequest.getTagsCode());
+                                    }
+                                    // FIXED BUG By shijia
+                                    this.reputFromOffset += size;
+                                    readSize += size;
+                                    if (DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {
+                                        DefaultMessageStore.this.storeStatsService
+                                                .getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).incrementAndGet();
+                                        DefaultMessageStore.this.storeStatsService
+                                                .getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic())
+                                                .addAndGet(dispatchRequest.getMsgSize());
+                                    }
+                                } else if (size == 0) {
+                                    this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset);
+                                    readSize = result.getSize();
+                                }
+                            } else if (!dispatchRequest.isSuccess()) {
+
+
+                                if (size > 0) {
+                                    log.error("[BUG]read total count not equals msg total size. reputFromOffset={}", reputFromOffset);
+                                    this.reputFromOffset += size;
+                                } else {
+                                    doNext = false;
+                                    if (DefaultMessageStore.this.brokerConfig.getBrokerId() == MixAll.MASTER_ID) {
+                                        log.error("[BUG]the master dispatch message to consume queue error, COMMITLOG OFFSET: {}",
+                                                this.reputFromOffset);
+
+                                        this.reputFromOffset += result.getSize() - readSize;
+                                    }
+                                }
+                            }
+                        }
+                    } finally {
+                        result.release();
+                    }
+                } else {
+                    doNext = false;
+                }
+            }
+        }
+
+
+        @Override
+        public void run() {
+            DefaultMessageStore.log.info(this.getServiceName() + " service started");
+
+            while (!this.isStopped()) {
+                try {
+                    Thread.sleep(1);
+                    this.doReput();
+                } catch (Exception e) {
+                    DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
+                }
+            }
+
+            DefaultMessageStore.log.info(this.getServiceName() + " service end");
+        }
+
+
+        @Override
+        public String getServiceName() {
+            return ReputMessageService.class.getSimpleName();
+        }
+
+
+    }
+
+    public int remainTransientStoreBufferNumbs() {
+        return this.transientStorePool.remainBufferNumbs();
+    }
+
+    @Override
+    public boolean isTransientStorePoolDeficient() {
+        return remainTransientStoreBufferNumbs() == 0;
+    }
+
+
+    public void unlockMappedFile(final MappedFile mappedFile) {
+        this.scheduledExecutorService.schedule(new Runnable() {
+            @Override
+            public void run() {
+                mappedFile.munlock();
+            }
+        }, 6, TimeUnit.SECONDS);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/DispatchRequest.java
----------------------------------------------------------------------
diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/DispatchRequest.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/DispatchRequest.java
new file mode 100644
index 0000000..4f2c1a9
--- /dev/null
+++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/DispatchRequest.java
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.store;
+
+/**
+ * @author shijia.wxr
+ */
+public class DispatchRequest {
+    private final String topic;
+    private final int queueId;
+    private final long commitLogOffset;
+    private final int msgSize;
+    private final long tagsCode;
+    private final long storeTimestamp;
+    private final long consumeQueueOffset;
+    private final String keys;
+    private final boolean success;
+    private final String uniqKey;
+
+    private final int sysFlag;
+    private final long preparedTransactionOffset;
+
+
+    public DispatchRequest(
+            final String topic,
+            final int queueId,
+            final long commitLogOffset,
+            final int msgSize,
+            final long tagsCode,
+            final long storeTimestamp,
+            final long consumeQueueOffset,
+            final String keys,
+            final String uniqKey,
+            final int sysFlag,
+            final long preparedTransactionOffset
+    ) {
+        this.topic = topic;
+        this.queueId = queueId;
+        this.commitLogOffset = commitLogOffset;
+        this.msgSize = msgSize;
+        this.tagsCode = tagsCode;
+        this.storeTimestamp = storeTimestamp;
+        this.consumeQueueOffset = consumeQueueOffset;
+        this.keys = keys;
+        this.uniqKey = uniqKey;
+
+        this.sysFlag = sysFlag;
+        this.preparedTransactionOffset = preparedTransactionOffset;
+        this.success = true;
+    }
+
+    public DispatchRequest(int size) {
+        // 1
+        this.topic = "";
+        // 2
+        this.queueId = 0;
+        // 3
+        this.commitLogOffset = 0;
+        // 4
+        this.msgSize = size;
+        // 5
+        this.tagsCode = 0;
+        // 6
+        this.storeTimestamp = 0;
+        // 7
+        this.consumeQueueOffset = 0;
+        // 8
+        this.keys = "";
+        //9
+        this.uniqKey = null;
+        this.sysFlag = 0;
+        this.preparedTransactionOffset = 0;
+        this.success = false;
+    }
+
+    public DispatchRequest(int size, boolean success) {
+        // 1
+        this.topic = "";
+        // 2
+        this.queueId = 0;
+        // 3
+        this.commitLogOffset = 0;
+        // 4
+        this.msgSize = size;
+        // 5
+        this.tagsCode = 0;
+        // 6
+        this.storeTimestamp = 0;
+        // 7
+        this.consumeQueueOffset = 0;
+        // 8
+        this.keys = "";
+        // 9
+        this.uniqKey = null;
+        this.sysFlag = 0;
+        this.preparedTransactionOffset = 0;
+        this.success = success;
+    }
+
+
+    public String getTopic() {
+        return topic;
+    }
+
+
+    public int getQueueId() {
+        return queueId;
+    }
+
+
+    public long getCommitLogOffset() {
+        return commitLogOffset;
+    }
+
+
+    public int getMsgSize() {
+        return msgSize;
+    }
+
+
+    public long getStoreTimestamp() {
+        return storeTimestamp;
+    }
+
+
+    public long getConsumeQueueOffset() {
+        return consumeQueueOffset;
+    }
+
+
+    public String getKeys() {
+        return keys;
+    }
+
+
+    public long getTagsCode() {
+        return tagsCode;
+    }
+
+
+    public int getSysFlag() {
+        return sysFlag;
+    }
+
+
+    public long getPreparedTransactionOffset() {
+        return preparedTransactionOffset;
+    }
+
+
+    public boolean isSuccess() {
+        return success;
+    }
+
+    public String getUniqKey() {
+        return uniqKey;
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/GetMessageResult.java
----------------------------------------------------------------------
diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/GetMessageResult.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/GetMessageResult.java
new file mode 100644
index 0000000..05a0003
--- /dev/null
+++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/GetMessageResult.java
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.store;
+
+import com.alibaba.rocketmq.store.stats.BrokerStatsManager;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class GetMessageResult {
+
+    private final List<SelectMappedBufferResult> messageMapedList =
+            new ArrayList<SelectMappedBufferResult>(100);
+
+    private final List<ByteBuffer> messageBufferList = new ArrayList<ByteBuffer>(100);
+
+    private GetMessageStatus status;
+    private long nextBeginOffset;
+    private long minOffset;
+    private long maxOffset;
+
+    private int bufferTotalSize = 0;
+
+    private boolean suggestPullingFromSlave = false;
+
+    private int msgCount4Commercial = 0;
+
+
+    public GetMessageResult() {
+    }
+
+
+    public GetMessageStatus getStatus() {
+        return status;
+    }
+
+
+    public void setStatus(GetMessageStatus status) {
+        this.status = status;
+    }
+
+
+    public long getNextBeginOffset() {
+        return nextBeginOffset;
+    }
+
+
+    public void setNextBeginOffset(long nextBeginOffset) {
+        this.nextBeginOffset = nextBeginOffset;
+    }
+
+
+    public long getMinOffset() {
+        return minOffset;
+    }
+
+
+    public void setMinOffset(long minOffset) {
+        this.minOffset = minOffset;
+    }
+
+
+    public long getMaxOffset() {
+        return maxOffset;
+    }
+
+
+    public void setMaxOffset(long maxOffset) {
+        this.maxOffset = maxOffset;
+    }
+
+
+    public List<SelectMappedBufferResult> getMessageMapedList() {
+        return messageMapedList;
+    }
+
+
+    public List<ByteBuffer> getMessageBufferList() {
+        return messageBufferList;
+    }
+
+
+    public void addMessage(final SelectMappedBufferResult mapedBuffer) {
+        this.messageMapedList.add(mapedBuffer);
+        this.messageBufferList.add(mapedBuffer.getByteBuffer());
+        this.bufferTotalSize += mapedBuffer.getSize();
+        this.msgCount4Commercial += (int) Math.ceil(
+                mapedBuffer.getSize() / BrokerStatsManager.SIZE_PER_COUNT);
+    }
+
+
+    public void release() {
+        for (SelectMappedBufferResult select : this.messageMapedList) {
+            select.release();
+        }
+    }
+
+
+    public int getBufferTotalSize() {
+        return bufferTotalSize;
+    }
+
+
+    public void setBufferTotalSize(int bufferTotalSize) {
+        this.bufferTotalSize = bufferTotalSize;
+    }
+
+
+    public int getMessageCount() {
+        return this.messageMapedList.size();
+    }
+
+
+    public boolean isSuggestPullingFromSlave() {
+        return suggestPullingFromSlave;
+    }
+
+
+    public void setSuggestPullingFromSlave(boolean suggestPullingFromSlave) {
+        this.suggestPullingFromSlave = suggestPullingFromSlave;
+    }
+
+    public int getMsgCount4Commercial() {
+        return msgCount4Commercial;
+    }
+
+    public void setMsgCount4Commercial(int msgCount4Commercial) {
+        this.msgCount4Commercial = msgCount4Commercial;
+    }
+
+
+    @Override
+    public String toString() {
+        return "GetMessageResult [status=" + status + ", nextBeginOffset=" + nextBeginOffset + ", minOffset="
+                + minOffset + ", maxOffset=" + maxOffset + ", bufferTotalSize=" + bufferTotalSize
+                + ", suggestPullingFromSlave=" + suggestPullingFromSlave + "]";
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/GetMessageStatus.java
----------------------------------------------------------------------
diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/GetMessageStatus.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/GetMessageStatus.java
new file mode 100644
index 0000000..87d6fe0
--- /dev/null
+++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/GetMessageStatus.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.store;
+
+/**
+ * @author shijia.wxr
+ */
+public enum GetMessageStatus {
+
+    FOUND,
+
+    NO_MATCHED_MESSAGE,
+
+    MESSAGE_WAS_REMOVING,
+
+    OFFSET_FOUND_NULL,
+
+    OFFSET_OVERFLOW_BADLY,
+
+    OFFSET_OVERFLOW_ONE,
+
+    OFFSET_TOO_SMALL,
+
+    NO_MATCHED_LOGIC_QUEUE,
+
+    NO_MESSAGE_IN_QUEUE,
+}