You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2016/12/19 09:40:24 UTC

[07/43] incubator-rocketmq git commit: Finish code dump. Reviewed by: @yukon @vongosling @stevenschew @vintagewang @lollipop @zander

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/ha/HAService.java
----------------------------------------------------------------------
diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/ha/HAService.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/ha/HAService.java
new file mode 100644
index 0000000..00f9833
--- /dev/null
+++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/ha/HAService.java
@@ -0,0 +1,654 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.store.ha;
+
+import com.alibaba.rocketmq.common.ServiceThread;
+import com.alibaba.rocketmq.common.constant.LoggerName;
+import com.alibaba.rocketmq.remoting.common.RemotingUtil;
+import com.alibaba.rocketmq.store.CommitLog.GroupCommitRequest;
+import com.alibaba.rocketmq.store.DefaultMessageStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.*;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class HAService {
+    private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
+
+    private final AtomicInteger connectionCount = new AtomicInteger(0);
+
+    private final List<HAConnection> connectionList = new LinkedList<HAConnection>();
+
+    private final AcceptSocketService acceptSocketService;
+
+    private final DefaultMessageStore defaultMessageStore;
+
+    private final WaitNotifyObject waitNotifyObject = new WaitNotifyObject();
+    private final AtomicLong push2SlaveMaxOffset = new AtomicLong(0);
+
+    private final GroupTransferService groupTransferService;
+
+    private final HAClient haClient;
+
+
+    public HAService(final DefaultMessageStore defaultMessageStore) throws IOException {
+        this.defaultMessageStore = defaultMessageStore;
+        this.acceptSocketService =
+                new AcceptSocketService(defaultMessageStore.getMessageStoreConfig().getHaListenPort());
+        this.groupTransferService = new GroupTransferService();
+        this.haClient = new HAClient();
+    }
+
+
+    public void updateMasterAddress(final String newAddr) {
+        if (this.haClient != null) {
+            this.haClient.updateMasterAddress(newAddr);
+        }
+    }
+
+
+    public void putRequest(final GroupCommitRequest request) {
+        this.groupTransferService.putRequest(request);
+    }
+
+
+    public boolean isSlaveOK(final long masterPutWhere) {
+        boolean result = this.connectionCount.get() > 0;
+        result =
+                result
+                        && ((masterPutWhere - this.push2SlaveMaxOffset.get()) < this.defaultMessageStore
+                        .getMessageStoreConfig().getHaSlaveFallbehindMax());
+        return result;
+    }
+
+
+    /**
+
+     */
+    public void notifyTransferSome(final long offset) {
+        for (long value = this.push2SlaveMaxOffset.get(); offset > value; ) {
+            boolean ok = this.push2SlaveMaxOffset.compareAndSet(value, offset);
+            if (ok) {
+                this.groupTransferService.notifyTransferSome();
+                break;
+            } else {
+                value = this.push2SlaveMaxOffset.get();
+            }
+        }
+    }
+
+
+    public AtomicInteger getConnectionCount() {
+        return connectionCount;
+    }
+
+
+    // public void notifyTransferSome() {
+    // this.groupTransferService.notifyTransferSome();
+    // }
+
+    public void start() {
+        this.acceptSocketService.beginAccept();
+        this.acceptSocketService.start();
+        this.groupTransferService.start();
+        this.haClient.start();
+    }
+
+
+    public void addConnection(final HAConnection conn) {
+        synchronized (this.connectionList) {
+            this.connectionList.add(conn);
+        }
+    }
+
+
+    public void removeConnection(final HAConnection conn) {
+        synchronized (this.connectionList) {
+            this.connectionList.remove(conn);
+        }
+    }
+
+
+    public void shutdown() {
+        this.haClient.shutdown();
+        this.acceptSocketService.shutdown(true);
+        this.destroyConnections();
+        this.groupTransferService.shutdown();
+    }
+
+
+    public void destroyConnections() {
+        synchronized (this.connectionList) {
+            for (HAConnection c : this.connectionList) {
+                c.shutdown();
+            }
+
+            this.connectionList.clear();
+        }
+    }
+
+
+    public DefaultMessageStore getDefaultMessageStore() {
+        return defaultMessageStore;
+    }
+
+
+    public WaitNotifyObject getWaitNotifyObject() {
+        return waitNotifyObject;
+    }
+
+    public AtomicLong getPush2SlaveMaxOffset() {
+        return push2SlaveMaxOffset;
+    }
+
+    class AcceptSocketService extends ServiceThread {
+        private ServerSocketChannel serverSocketChannel;
+        private Selector selector;
+        private SocketAddress socketAddressListen;
+
+
+        public AcceptSocketService(final int port) {
+            this.socketAddressListen = new InetSocketAddress(port);
+        }
+
+
+        public void beginAccept() {
+            try {
+                this.serverSocketChannel = ServerSocketChannel.open();
+                this.selector = RemotingUtil.openSelector();
+                this.serverSocketChannel.socket().setReuseAddress(true);
+                this.serverSocketChannel.socket().bind(this.socketAddressListen);
+                this.serverSocketChannel.configureBlocking(false);
+                this.serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT);
+            } catch (Exception e) {
+                log.error("beginAccept exception", e);
+            }
+        }
+
+
+        @Override
+        public void run() {
+            log.info(this.getServiceName() + " service started");
+
+            while (!this.isStopped()) {
+                try {
+                    this.selector.select(1000);
+                    Set<SelectionKey> selected = this.selector.selectedKeys();
+                    if (selected != null) {
+                        for (SelectionKey k : selected) {
+                            if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
+                                SocketChannel sc = ((ServerSocketChannel) k.channel()).accept();
+                                if (sc != null) {
+                                    HAService.log.info("HAService receive new connection, "
+                                            + sc.socket().getRemoteSocketAddress());
+
+                                    try {
+                                        HAConnection conn = new HAConnection(HAService.this, sc);
+                                        conn.start();
+                                        HAService.this.addConnection(conn);
+                                    } catch (Exception e) {
+                                        log.error("new HAConnection exception", e);
+                                        sc.close();
+                                    }
+                                }
+                            } else {
+                                log.warn("Unexpected ops in select " + k.readyOps());
+                            }
+                        }
+
+                        selected.clear();
+                    }
+
+                } catch (Exception e) {
+                    log.error(this.getServiceName() + " service has exception.", e);
+                }
+            }
+
+            log.error(this.getServiceName() + " service end");
+        }
+
+
+        @Override
+        public String getServiceName() {
+            return AcceptSocketService.class.getSimpleName();
+        }
+    }
+
+    /**
+     * GroupTransferService Service
+     */
+    class GroupTransferService extends ServiceThread {
+
+        private final WaitNotifyObject notifyTransferObject = new WaitNotifyObject();
+        private volatile List<GroupCommitRequest> requestsWrite = new ArrayList<GroupCommitRequest>();
+        private volatile List<GroupCommitRequest> requestsRead = new ArrayList<GroupCommitRequest>();
+
+
+        public void putRequest(final GroupCommitRequest request) {
+            synchronized (this) {
+                this.requestsWrite.add(request);
+                if (hasNotified.compareAndSet(false, true)) {
+                    waitPoint.countDown(); // notify
+                }
+            }
+        }
+
+
+        public void notifyTransferSome() {
+            this.notifyTransferObject.wakeup();
+        }
+
+
+        private void swapRequests() {
+            List<GroupCommitRequest> tmp = this.requestsWrite;
+            this.requestsWrite = this.requestsRead;
+            this.requestsRead = tmp;
+        }
+
+
+        private void doWaitTransfer() {
+            if (!this.requestsRead.isEmpty()) {
+                for (GroupCommitRequest req : this.requestsRead) {
+                    boolean transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
+                    for (int i = 0; !transferOK && i < 5; i++) {
+                        this.notifyTransferObject.waitForRunning(1000);
+                        transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
+                    }
+
+                    if (!transferOK) {
+                        log.warn("transfer messsage to slave timeout, " + req.getNextOffset());
+                    }
+
+                    req.wakeupCustomer(transferOK);
+                }
+
+                this.requestsRead.clear();
+            }
+        }
+
+
+        public void run() {
+            log.info(this.getServiceName() + " service started");
+
+            while (!this.isStopped()) {
+                try {
+                    this.waitForRunning(0);
+                    this.doWaitTransfer();
+                } catch (Exception e) {
+                    log.warn(this.getServiceName() + " service has exception. ", e);
+                }
+            }
+
+            log.info(this.getServiceName() + " service end");
+        }
+
+
+        @Override
+        protected void onWaitEnd() {
+            this.swapRequests();
+        }
+
+
+        @Override
+        public String getServiceName() {
+            return GroupTransferService.class.getSimpleName();
+        }
+    }
+
+    class HAClient extends ServiceThread {
+        private static final int READ_MAX_BUFFER_SIZE = 1024 * 1024 * 4;
+        private final AtomicReference<String> masterAddress = new AtomicReference<String>();
+        private final ByteBuffer reportOffset = ByteBuffer.allocate(8);
+        private SocketChannel socketChannel;
+        private Selector selector;
+        private long lastWriteTimestamp = System.currentTimeMillis();
+
+        private long currentReportedOffset = 0;
+        private int dispatchPostion = 0;
+        private ByteBuffer byteBufferRead = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);
+        private ByteBuffer byteBufferBackup = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);
+
+
+        public HAClient() throws IOException {
+            this.selector = RemotingUtil.openSelector();
+        }
+
+
+        public void updateMasterAddress(final String newAddr) {
+            String currentAddr = this.masterAddress.get();
+            if (currentAddr == null || !currentAddr.equals(newAddr)) {
+                this.masterAddress.set(newAddr);
+                log.info("update master address, OLD: " + currentAddr + " NEW: " + newAddr);
+            }
+        }
+
+
+        private boolean isTimeToReportOffset() {
+            long interval =
+                    HAService.this.defaultMessageStore.getSystemClock().now() - this.lastWriteTimestamp;
+            boolean needHeart = interval > HAService.this.defaultMessageStore.getMessageStoreConfig()
+                    .getHaSendHeartbeatInterval();
+
+            return needHeart;
+        }
+
+
+        private boolean reportSlaveMaxOffset(final long maxOffset) {
+            this.reportOffset.position(0);
+            this.reportOffset.limit(8);
+            this.reportOffset.putLong(maxOffset);
+            this.reportOffset.position(0);
+            this.reportOffset.limit(8);
+
+            for (int i = 0; i < 3 && this.reportOffset.hasRemaining(); i++) {
+                try {
+                    this.socketChannel.write(this.reportOffset);
+                } catch (IOException e) {
+                    log.error(this.getServiceName()
+                            + "reportSlaveMaxOffset this.socketChannel.write exception", e);
+                    return false;
+                }
+            }
+
+            return !this.reportOffset.hasRemaining();
+        }
+
+
+        // private void reallocateByteBuffer() {
+        // ByteBuffer bb = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);
+        // int remain = this.byteBufferRead.limit() - this.dispatchPostion;
+        // bb.put(this.byteBufferRead.array(), this.dispatchPostion, remain);
+        // this.dispatchPostion = 0;
+        // this.byteBufferRead = bb;
+        // }
+
+        /**
+
+         */
+        private void reallocateByteBuffer() {
+            int remain = READ_MAX_BUFFER_SIZE - this.dispatchPostion;
+            if (remain > 0) {
+                this.byteBufferRead.position(this.dispatchPostion);
+
+                this.byteBufferBackup.position(0);
+                this.byteBufferBackup.limit(READ_MAX_BUFFER_SIZE);
+                this.byteBufferBackup.put(this.byteBufferRead);
+            }
+
+            this.swapByteBuffer();
+
+            this.byteBufferRead.position(remain);
+            this.byteBufferRead.limit(READ_MAX_BUFFER_SIZE);
+            this.dispatchPostion = 0;
+        }
+
+
+        private void swapByteBuffer() {
+            ByteBuffer tmp = this.byteBufferRead;
+            this.byteBufferRead = this.byteBufferBackup;
+            this.byteBufferBackup = tmp;
+        }
+
+
+        private boolean processReadEvent() {
+            int readSizeZeroTimes = 0;
+            while (this.byteBufferRead.hasRemaining()) {
+                try {
+                    int readSize = this.socketChannel.read(this.byteBufferRead);
+                    if (readSize > 0) {
+                        lastWriteTimestamp = HAService.this.defaultMessageStore.getSystemClock().now();
+                        readSizeZeroTimes = 0;
+                        boolean result = this.dispatchReadRequest();
+                        if (!result) {
+                            log.error("HAClient, dispatchReadRequest error");
+                            return false;
+                        }
+                    } else if (readSize == 0) {
+                        if (++readSizeZeroTimes >= 3) {
+                            break;
+                        }
+                    } else {
+                        // TODO ERROR
+                        log.info("HAClient, processReadEvent read socket < 0");
+                        return false;
+                    }
+                } catch (IOException e) {
+                    log.info("HAClient, processReadEvent read socket exception", e);
+                    return false;
+                }
+            }
+
+            return true;
+        }
+
+
+        private boolean dispatchReadRequest() {
+            final int msgHeaderSize = 8 + 4; // phyoffset + size
+            int readSocketPos = this.byteBufferRead.position();
+
+            while (true) {
+                int diff = this.byteBufferRead.position() - this.dispatchPostion;
+                if (diff >= msgHeaderSize) {
+                    long masterPhyOffset = this.byteBufferRead.getLong(this.dispatchPostion);
+                    int bodySize = this.byteBufferRead.getInt(this.dispatchPostion + 8);
+
+                    long slavePhyOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
+
+
+                    if (slavePhyOffset != 0) {
+                        if (slavePhyOffset != masterPhyOffset) {
+                            log.error("master pushed offset not equal the max phy offset in slave, SLAVE: "
+                                    + slavePhyOffset + " MASTER: " + masterPhyOffset);
+                            return false;
+                        }
+                    }
+
+
+                    if (diff >= (msgHeaderSize + bodySize)) {
+                        byte[] bodyData = new byte[bodySize];
+                        this.byteBufferRead.position(this.dispatchPostion + msgHeaderSize);
+                        this.byteBufferRead.get(bodyData);
+
+
+                        HAService.this.defaultMessageStore.appendToCommitLog(masterPhyOffset, bodyData);
+
+                        this.byteBufferRead.position(readSocketPos);
+                        this.dispatchPostion += msgHeaderSize + bodySize;
+
+                        if (!reportSlaveMaxOffsetPlus()) {
+                            return false;
+                        }
+
+                        continue;
+                    }
+                }
+
+                if (!this.byteBufferRead.hasRemaining()) {
+                    this.reallocateByteBuffer();
+                }
+
+                break;
+            }
+
+            return true;
+        }
+
+
+        private boolean reportSlaveMaxOffsetPlus() {
+            boolean result = true;
+            long currentPhyOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
+            if (currentPhyOffset > this.currentReportedOffset) {
+                this.currentReportedOffset = currentPhyOffset;
+                result = this.reportSlaveMaxOffset(this.currentReportedOffset);
+                if (!result) {
+                    this.closeMaster();
+                    log.error("HAClient, reportSlaveMaxOffset error, " + this.currentReportedOffset);
+                }
+            }
+
+            return result;
+        }
+
+
+        private boolean connectMaster() throws ClosedChannelException {
+            if (null == socketChannel) {
+                String addr = this.masterAddress.get();
+                if (addr != null) {
+
+                    SocketAddress socketAddress = RemotingUtil.string2SocketAddress(addr);
+                    if (socketAddress != null) {
+                        this.socketChannel = RemotingUtil.connect(socketAddress);
+                        if (this.socketChannel != null) {
+                            this.socketChannel.register(this.selector, SelectionKey.OP_READ);
+                        }
+                    }
+                }
+
+                this.currentReportedOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
+
+                this.lastWriteTimestamp = System.currentTimeMillis();
+            }
+
+            return this.socketChannel != null;
+        }
+
+
+        private void closeMaster() {
+            if (null != this.socketChannel) {
+                try {
+
+                    SelectionKey sk = this.socketChannel.keyFor(this.selector);
+                    if (sk != null) {
+                        sk.cancel();
+                    }
+
+                    this.socketChannel.close();
+
+                    this.socketChannel = null;
+                } catch (IOException e) {
+                    log.warn("closeMaster exception. ", e);
+                }
+
+                this.lastWriteTimestamp = 0;
+                this.dispatchPostion = 0;
+
+                this.byteBufferBackup.position(0);
+                this.byteBufferBackup.limit(READ_MAX_BUFFER_SIZE);
+
+                this.byteBufferRead.position(0);
+                this.byteBufferRead.limit(READ_MAX_BUFFER_SIZE);
+            }
+        }
+
+
+        @Override
+        public void run() {
+            log.info(this.getServiceName() + " service started");
+
+            while (!this.isStopped()) {
+                try {
+                    if (this.connectMaster()) {
+
+                        if (this.isTimeToReportOffset()) {
+                            boolean result = this.reportSlaveMaxOffset(this.currentReportedOffset);
+                            if (!result) {
+                                this.closeMaster();
+                            }
+                        }
+
+
+                        this.selector.select(1000);
+
+
+                        boolean ok = this.processReadEvent();
+                        if (!ok) {
+                            this.closeMaster();
+                        }
+
+                        if (!reportSlaveMaxOffsetPlus()) {
+                            continue;
+                        }
+
+
+                        long interval =
+                                HAService.this.getDefaultMessageStore().getSystemClock().now()
+                                        - this.lastWriteTimestamp;
+                        if (interval > HAService.this.getDefaultMessageStore().getMessageStoreConfig()
+                                .getHaHousekeepingInterval()) {
+                            log.warn("HAClient, housekeeping, found this connection[" + this.masterAddress
+                                    + "] expired, " + interval);
+                            this.closeMaster();
+                            log.warn("HAClient, master not response some time, so close connection");
+                        }
+                    } else {
+                        this.waitForRunning(1000 * 5);
+                    }
+                } catch (Exception e) {
+                    log.warn(this.getServiceName() + " service has exception. ", e);
+                    this.waitForRunning(1000 * 5);
+                }
+            }
+
+            log.info(this.getServiceName() + " service end");
+        }
+
+
+        //
+        // private void disableWriteFlag() {
+        // if (this.socketChannel != null) {
+        // SelectionKey sk = this.socketChannel.keyFor(this.selector);
+        // if (sk != null) {
+        // int ops = sk.interestOps();
+        // ops &= ~SelectionKey.OP_WRITE;
+        // sk.interestOps(ops);
+        // }
+        // }
+        // }
+        //
+        //
+        // private void enableWriteFlag() {
+        // if (this.socketChannel != null) {
+        // SelectionKey sk = this.socketChannel.keyFor(this.selector);
+        // if (sk != null) {
+        // int ops = sk.interestOps();
+        // ops |= SelectionKey.OP_WRITE;
+        // sk.interestOps(ops);
+        // }
+        // }
+        // }
+
+        @Override
+        public String getServiceName() {
+            return HAClient.class.getSimpleName();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/ha/WaitNotifyObject.java
----------------------------------------------------------------------
diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/ha/WaitNotifyObject.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/ha/WaitNotifyObject.java
new file mode 100644
index 0000000..f540cdb
--- /dev/null
+++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/ha/WaitNotifyObject.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.store.ha;
+
+import java.util.HashMap;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class WaitNotifyObject {
+
+    protected final HashMap<Long/* thread id */, Boolean/* notified */> waitingThreadTable =
+            new HashMap<Long, Boolean>(16);
+
+    protected volatile boolean hasNotified = false;
+
+
+    public void wakeup() {
+        synchronized (this) {
+            if (!this.hasNotified) {
+                this.hasNotified = true;
+                this.notify();
+            }
+        }
+    }
+
+
+    protected void waitForRunning(long interval) {
+        synchronized (this) {
+            if (this.hasNotified) {
+                this.hasNotified = false;
+                this.onWaitEnd();
+                return;
+            }
+
+            try {
+                this.wait(interval);
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            } finally {
+                this.hasNotified = false;
+                this.onWaitEnd();
+            }
+        }
+    }
+
+
+    protected void onWaitEnd() {
+    }
+
+    public void wakeupAll() {
+        synchronized (this) {
+            boolean needNotify = false;
+
+            for (Boolean value : this.waitingThreadTable.values()) {
+                needNotify = needNotify || !value;
+                value = true;
+            }
+
+            if (needNotify) {
+                this.notifyAll();
+            }
+        }
+    }
+
+    public void allWaitForRunning(long interval) {
+        long currentThreadId = Thread.currentThread().getId();
+        synchronized (this) {
+            Boolean notified = this.waitingThreadTable.get(currentThreadId);
+            if (notified != null && notified) {
+                this.waitingThreadTable.put(currentThreadId, false);
+                this.onWaitEnd();
+                return;
+            }
+
+            try {
+                this.wait(interval);
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            } finally {
+                this.waitingThreadTable.put(currentThreadId, false);
+                this.onWaitEnd();
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/index/IndexFile.java
----------------------------------------------------------------------
diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/index/IndexFile.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/index/IndexFile.java
new file mode 100644
index 0000000..f353320
--- /dev/null
+++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/index/IndexFile.java
@@ -0,0 +1,290 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.store.index;
+
+import com.alibaba.rocketmq.common.constant.LoggerName;
+import com.alibaba.rocketmq.store.MappedFile;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.channels.FileLock;
+import java.util.List;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class IndexFile {
+    private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
+    private static int hashSlotSize = 4;
+    private static int indexSize = 20;
+    private static int invalidIndex = 0;
+    private final int hashSlotNum;
+    private final int indexNum;
+    private final MappedFile mappedFile;
+    private final FileChannel fileChannel;
+    private final MappedByteBuffer mappedByteBuffer;
+    private final IndexHeader indexHeader;
+
+
+    public IndexFile(final String fileName, final int hashSlotNum, final int indexNum,
+                     final long endPhyOffset, final long endTimestamp) throws IOException {
+        int fileTotalSize =
+                IndexHeader.INDEX_HEADER_SIZE + (hashSlotNum * hashSlotSize) + (indexNum * indexSize);
+        this.mappedFile = new MappedFile(fileName, fileTotalSize);
+        this.fileChannel = this.mappedFile.getFileChannel();
+        this.mappedByteBuffer = this.mappedFile.getMappedByteBuffer();
+        this.hashSlotNum = hashSlotNum;
+        this.indexNum = indexNum;
+
+        ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
+        this.indexHeader = new IndexHeader(byteBuffer);
+
+        if (endPhyOffset > 0) {
+            this.indexHeader.setBeginPhyOffset(endPhyOffset);
+            this.indexHeader.setEndPhyOffset(endPhyOffset);
+        }
+
+        if (endTimestamp > 0) {
+            this.indexHeader.setBeginTimestamp(endTimestamp);
+            this.indexHeader.setEndTimestamp(endTimestamp);
+        }
+    }
+
+
+    public String getFileName() {
+        return this.mappedFile.getFileName();
+    }
+
+
+    public void load() {
+        this.indexHeader.load();
+    }
+
+
+    public void flush() {
+        long beginTime = System.currentTimeMillis();
+        if (this.mappedFile.hold()) {
+            this.indexHeader.updateByteBuffer();
+            this.mappedByteBuffer.force();
+            this.mappedFile.release();
+            log.info("flush index file eclipse time(ms) " + (System.currentTimeMillis() - beginTime));
+        }
+    }
+
+    public boolean isWriteFull() {
+        return this.indexHeader.getIndexCount() >= this.indexNum;
+    }
+
+
+    public boolean destroy(final long intervalForcibly) {
+        return this.mappedFile.destroy(intervalForcibly);
+    }
+
+    public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) {
+        if (this.indexHeader.getIndexCount() < this.indexNum) {
+            int keyHash = indexKeyHashMethod(key);
+            int slotPos = keyHash % this.hashSlotNum;
+            int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;
+
+            FileLock fileLock = null;
+
+            try {
+
+                // fileLock = this.fileChannel.lock(absSlotPos, hashSlotSize,
+                // false);
+                int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
+                if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()) {
+                    slotValue = invalidIndex;
+                }
+
+                long timeDiff = storeTimestamp - this.indexHeader.getBeginTimestamp();
+
+
+                timeDiff = timeDiff / 1000;
+
+
+                if (this.indexHeader.getBeginTimestamp() <= 0) {
+                    timeDiff = 0;
+                } else if (timeDiff > Integer.MAX_VALUE) {
+                    timeDiff = Integer.MAX_VALUE;
+                } else if (timeDiff < 0) {
+                    timeDiff = 0;
+                }
+
+                int absIndexPos =
+                        IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
+                                + this.indexHeader.getIndexCount() * indexSize;
+
+
+                this.mappedByteBuffer.putInt(absIndexPos, keyHash);
+                this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);
+                this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff);
+                this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);
+
+
+                this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());
+
+
+                if (this.indexHeader.getIndexCount() <= 1) {
+                    this.indexHeader.setBeginPhyOffset(phyOffset);
+                    this.indexHeader.setBeginTimestamp(storeTimestamp);
+                }
+
+                this.indexHeader.incHashSlotCount();
+                this.indexHeader.incIndexCount();
+                this.indexHeader.setEndPhyOffset(phyOffset);
+                this.indexHeader.setEndTimestamp(storeTimestamp);
+
+                return true;
+            } catch (Exception e) {
+                log.error("putKey exception, Key: " + key + " KeyHashCode: " + key.hashCode(), e);
+            } finally {
+                if (fileLock != null) {
+                    try {
+                        fileLock.release();
+                    } catch (IOException e) {
+                        e.printStackTrace();
+                    }
+                }
+            }
+        } else {
+            log.warn("putKey index count " + this.indexHeader.getIndexCount() + " index max num "
+                    + this.indexNum);
+        }
+
+        return false;
+    }
+
+    public int indexKeyHashMethod(final String key) {
+        int keyHash = key.hashCode();
+        int keyHashPositive = Math.abs(keyHash);
+        if (keyHashPositive < 0)
+            keyHashPositive = 0;
+        return keyHashPositive;
+    }
+
+    public long getBeginTimestamp() {
+        return this.indexHeader.getBeginTimestamp();
+    }
+
+    public long getEndTimestamp() {
+        return this.indexHeader.getEndTimestamp();
+    }
+
+    public long getEndPhyOffset() {
+        return this.indexHeader.getEndPhyOffset();
+    }
+
+    public boolean isTimeMatched(final long begin, final long end) {
+        boolean result =
+                begin < this.indexHeader.getBeginTimestamp() && end > this.indexHeader.getEndTimestamp();
+
+        result =
+                result
+                        || (begin >= this.indexHeader.getBeginTimestamp() && begin <= this.indexHeader
+                        .getEndTimestamp());
+
+        result =
+                result
+                        || (end >= this.indexHeader.getBeginTimestamp() && end <= this.indexHeader
+                        .getEndTimestamp());
+        return result;
+    }
+
+    public void selectPhyOffset(final List<Long> phyOffsets, final String key, final int maxNum,
+                                final long begin, final long end, boolean lock) {
+        if (this.mappedFile.hold()) {
+            int keyHash = indexKeyHashMethod(key);
+            int slotPos = keyHash % this.hashSlotNum;
+            int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;
+
+            FileLock fileLock = null;
+            try {
+                if (lock) {
+                    // fileLock = this.fileChannel.lock(absSlotPos,
+                    // hashSlotSize, true);
+                }
+
+                int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
+                // if (fileLock != null) {
+                // fileLock.release();
+                // fileLock = null;
+                // }
+
+                if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()
+                        || this.indexHeader.getIndexCount() <= 1) {
+                    // TODO NOTFOUND
+                } else {
+                    for (int nextIndexToRead = slotValue;;) {
+                        if (phyOffsets.size() >= maxNum) {
+                            break;
+                        }
+
+                        int absIndexPos =
+                                IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
+                                        + nextIndexToRead * indexSize;
+
+                        int keyHashRead = this.mappedByteBuffer.getInt(absIndexPos);
+                        long phyOffsetRead = this.mappedByteBuffer.getLong(absIndexPos + 4);
+
+                        long timeDiff = (long) this.mappedByteBuffer.getInt(absIndexPos + 4 + 8);
+                        int prevIndexRead = this.mappedByteBuffer.getInt(absIndexPos + 4 + 8 + 4);
+
+
+                        if (timeDiff < 0) {
+                            break;
+                        }
+
+                        timeDiff *= 1000L;
+
+                        long timeRead = this.indexHeader.getBeginTimestamp() + timeDiff;
+                        boolean timeMatched = (timeRead >= begin) && (timeRead <= end);
+
+                        if (keyHash == keyHashRead && timeMatched) {
+                            phyOffsets.add(phyOffsetRead);
+                        }
+
+                        if (prevIndexRead <= invalidIndex
+                                || prevIndexRead > this.indexHeader.getIndexCount()
+                                || prevIndexRead == nextIndexToRead || timeRead < begin) {
+                            break;
+                        }
+
+                        nextIndexToRead = prevIndexRead;
+                    }
+                }
+            } catch (Exception e) {
+                log.error("selectPhyOffset exception ", e);
+            } finally {
+                if (fileLock != null) {
+                    try {
+                        fileLock.release();
+                    } catch (IOException e) {
+                        e.printStackTrace();
+                    }
+                }
+
+                this.mappedFile.release();
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/index/IndexHeader.java
----------------------------------------------------------------------
diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/index/IndexHeader.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/index/IndexHeader.java
new file mode 100644
index 0000000..d6015e3
--- /dev/null
+++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/index/IndexHeader.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.store.index;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+
+/**
+
+ *
+ * @author shijia.wxr
+ *
+ */
+public class IndexHeader {
+    public static final int INDEX_HEADER_SIZE = 40;
+    private static int beginTimestampIndex = 0;
+    private static int endTimestampIndex = 8;
+    private static int beginPhyoffsetIndex = 16;
+    private static int endPhyoffsetIndex = 24;
+    private static int hashSlotcountIndex = 32;
+    private static int indexCountIndex = 36;
+    private final ByteBuffer byteBuffer;
+    private AtomicLong beginTimestamp = new AtomicLong(0);
+    private AtomicLong endTimestamp = new AtomicLong(0);
+    private AtomicLong beginPhyOffset = new AtomicLong(0);
+    private AtomicLong endPhyOffset = new AtomicLong(0);
+    private AtomicInteger hashSlotCount = new AtomicInteger(0);
+
+    private AtomicInteger indexCount = new AtomicInteger(1);
+
+
+    public IndexHeader(final ByteBuffer byteBuffer) {
+        this.byteBuffer = byteBuffer;
+    }
+
+
+    public void load() {
+        this.beginTimestamp.set(byteBuffer.getLong(beginTimestampIndex));
+        this.endTimestamp.set(byteBuffer.getLong(endTimestampIndex));
+        this.beginPhyOffset.set(byteBuffer.getLong(beginPhyoffsetIndex));
+        this.endPhyOffset.set(byteBuffer.getLong(endPhyoffsetIndex));
+
+        this.hashSlotCount.set(byteBuffer.getInt(hashSlotcountIndex));
+        this.indexCount.set(byteBuffer.getInt(indexCountIndex));
+
+        if (this.indexCount.get() <= 0) {
+            this.indexCount.set(1);
+        }
+    }
+
+    public void updateByteBuffer() {
+        this.byteBuffer.putLong(beginTimestampIndex, this.beginTimestamp.get());
+        this.byteBuffer.putLong(endTimestampIndex, this.endTimestamp.get());
+        this.byteBuffer.putLong(beginPhyoffsetIndex, this.beginPhyOffset.get());
+        this.byteBuffer.putLong(endPhyoffsetIndex, this.endPhyOffset.get());
+        this.byteBuffer.putInt(hashSlotcountIndex, this.hashSlotCount.get());
+        this.byteBuffer.putInt(indexCountIndex, this.indexCount.get());
+    }
+
+
+    public long getBeginTimestamp() {
+        return beginTimestamp.get();
+    }
+
+
+    public void setBeginTimestamp(long beginTimestamp) {
+        this.beginTimestamp.set(beginTimestamp);
+        this.byteBuffer.putLong(beginTimestampIndex, beginTimestamp);
+    }
+
+
+    public long getEndTimestamp() {
+        return endTimestamp.get();
+    }
+
+
+    public void setEndTimestamp(long endTimestamp) {
+        this.endTimestamp.set(endTimestamp);
+        this.byteBuffer.putLong(endTimestampIndex, endTimestamp);
+    }
+
+
+    public long getBeginPhyOffset() {
+        return beginPhyOffset.get();
+    }
+
+
+    public void setBeginPhyOffset(long beginPhyOffset) {
+        this.beginPhyOffset.set(beginPhyOffset);
+        this.byteBuffer.putLong(beginPhyoffsetIndex, beginPhyOffset);
+    }
+
+
+    public long getEndPhyOffset() {
+        return endPhyOffset.get();
+    }
+
+
+    public void setEndPhyOffset(long endPhyOffset) {
+        this.endPhyOffset.set(endPhyOffset);
+        this.byteBuffer.putLong(endPhyoffsetIndex, endPhyOffset);
+    }
+
+
+    public AtomicInteger getHashSlotCount() {
+        return hashSlotCount;
+    }
+
+
+    public void incHashSlotCount() {
+        int value = this.hashSlotCount.incrementAndGet();
+        this.byteBuffer.putInt(hashSlotcountIndex, value);
+    }
+
+
+    public int getIndexCount() {
+        return indexCount.get();
+    }
+
+
+    public void incIndexCount() {
+        int value = this.indexCount.incrementAndGet();
+        this.byteBuffer.putInt(indexCountIndex, value);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/index/IndexService.java
----------------------------------------------------------------------
diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/index/IndexService.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/index/IndexService.java
new file mode 100644
index 0000000..f275f80
--- /dev/null
+++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/index/IndexService.java
@@ -0,0 +1,389 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.store.index;
+
+import com.alibaba.rocketmq.common.UtilAll;
+import com.alibaba.rocketmq.common.constant.LoggerName;
+import com.alibaba.rocketmq.common.message.MessageConst;
+import com.alibaba.rocketmq.common.sysflag.MessageSysFlag;
+import com.alibaba.rocketmq.store.DefaultMessageStore;
+import com.alibaba.rocketmq.store.DispatchRequest;
+import com.alibaba.rocketmq.store.config.StorePathConfigHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class IndexService {
+    private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
+    private final DefaultMessageStore defaultMessageStore;
+
+    private final int hashSlotNum;
+    private final int indexNum;
+    private final String storePath;
+
+    private final ArrayList<IndexFile> indexFileList = new ArrayList<IndexFile>();
+    private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+
+
+    public IndexService(final DefaultMessageStore store) {
+        this.defaultMessageStore = store;
+        this.hashSlotNum = store.getMessageStoreConfig().getMaxHashSlotNum();
+        this.indexNum = store.getMessageStoreConfig().getMaxIndexNum();
+        this.storePath =
+                StorePathConfigHelper.getStorePathIndex(store.getMessageStoreConfig().getStorePathRootDir());
+    }
+
+
+    public boolean load(final boolean lastExitOK) {
+        File dir = new File(this.storePath);
+        File[] files = dir.listFiles();
+        if (files != null) {
+            // ascending order
+            Arrays.sort(files);
+            for (File file : files) {
+                try {
+                    IndexFile f = new IndexFile(file.getPath(), this.hashSlotNum, this.indexNum, 0, 0);
+                    f.load();
+
+                    if (!lastExitOK) {
+                        if (f.getEndTimestamp() > this.defaultMessageStore.getStoreCheckpoint()
+                                .getIndexMsgTimestamp()) {
+                            f.destroy(0);
+                            continue;
+                        }
+                    }
+
+                    log.info("load index file OK, " + f.getFileName());
+                    this.indexFileList.add(f);
+                } catch (IOException e) {
+                    log.error("load file " + file + " error", e);
+                    return false;
+                } catch (NumberFormatException e) {
+                    continue;
+                }
+            }
+        }
+
+        return true;
+    }
+
+    public void deleteExpiredFile(long offset) {
+        Object[] files = null;
+        try {
+            this.readWriteLock.readLock().lock();
+            if (this.indexFileList.isEmpty()) {
+                return;
+            }
+
+            long endPhyOffset = this.indexFileList.get(0).getEndPhyOffset();
+            if (endPhyOffset < offset) {
+                files = this.indexFileList.toArray();
+            }
+        } catch (Exception e) {
+            log.error("destroy exception", e);
+        } finally {
+            this.readWriteLock.readLock().unlock();
+        }
+
+        if (files != null) {
+            List<IndexFile> fileList = new ArrayList<IndexFile>();
+            for (int i = 0; i < (files.length - 1); i++) {
+                IndexFile f = (IndexFile) files[i];
+                if (f.getEndPhyOffset() < offset) {
+                    fileList.add(f);
+                } else {
+                    break;
+                }
+            }
+
+            this.deleteExpiredFile(fileList);
+        }
+    }
+
+    private void deleteExpiredFile(List<IndexFile> files) {
+        if (!files.isEmpty()) {
+            try {
+                this.readWriteLock.writeLock().lock();
+                for (IndexFile file : files) {
+                    boolean destroyed = file.destroy(3000);
+                    destroyed = destroyed && this.indexFileList.remove(file);
+                    if (!destroyed) {
+                        log.error("deleteExpiredFile remove failed.");
+                        break;
+                    }
+                }
+            } catch (Exception e) {
+                log.error("deleteExpiredFile has exception.", e);
+            } finally {
+                this.readWriteLock.writeLock().unlock();
+            }
+        }
+    }
+
+
+    public void destroy() {
+        try {
+            this.readWriteLock.readLock().lock();
+            for (IndexFile f : this.indexFileList) {
+                f.destroy(1000 * 3);
+            }
+            this.indexFileList.clear();
+        } catch (Exception e) {
+            log.error("destroy exception", e);
+        } finally {
+            this.readWriteLock.readLock().unlock();
+        }
+    }
+
+
+    public QueryOffsetResult queryOffset(String topic, String key, int maxNum, long begin, long end) {
+        List<Long> phyOffsets = new ArrayList<Long>(maxNum);
+
+        long indexLastUpdateTimestamp = 0;
+        long indexLastUpdatePhyoffset = 0;
+        maxNum = Math.min(maxNum, this.defaultMessageStore.getMessageStoreConfig().getMaxMsgsNumBatch());
+        try {
+            this.readWriteLock.readLock().lock();
+            if (!this.indexFileList.isEmpty()) {
+                for (int i = this.indexFileList.size(); i > 0; i--) {
+                    IndexFile f = this.indexFileList.get(i - 1);
+                    boolean lastFile = i == this.indexFileList.size();
+                    if (lastFile) {
+                        indexLastUpdateTimestamp = f.getEndTimestamp();
+                        indexLastUpdatePhyoffset = f.getEndPhyOffset();
+                    }
+
+                    if (f.isTimeMatched(begin, end)) {
+
+                        f.selectPhyOffset(phyOffsets, buildKey(topic, key), maxNum, begin, end, lastFile);
+                    }
+
+
+                    if (f.getBeginTimestamp() < begin) {
+                        break;
+                    }
+
+                    if (phyOffsets.size() >= maxNum) {
+                        break;
+                    }
+                }
+            }
+        } catch (Exception e) {
+            log.error("queryMsg exception", e);
+        } finally {
+            this.readWriteLock.readLock().unlock();
+        }
+
+        return new QueryOffsetResult(phyOffsets, indexLastUpdateTimestamp, indexLastUpdatePhyoffset);
+    }
+
+
+    private String buildKey(final String topic, final String key) {
+        return topic + "#" + key;
+    }
+
+
+    public void buildIndex(DispatchRequest req) {
+        IndexFile indexFile = retryGetAndCreateIndexFile();
+        if (indexFile != null) {
+            long endPhyOffset = indexFile.getEndPhyOffset();
+            DispatchRequest msg = req;
+            String topic = msg.getTopic();
+            String keys = msg.getKeys();
+            if (msg.getCommitLogOffset() < endPhyOffset) {
+                return;
+            }
+
+            final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
+            switch (tranType) {
+                case MessageSysFlag.TRANSACTION_NOT_TYPE:
+                case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
+                case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
+                    break;
+                case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
+                    return;
+            }
+
+            if (req.getUniqKey() != null) {
+                indexFile = putKey(indexFile, msg, buildKey(topic, req.getUniqKey()));
+                if (indexFile == null) {
+                    log.error("putKey error commitlog {} uniqkey {}", req.getCommitLogOffset(), req.getUniqKey());
+                    return;
+                }
+            }
+
+            if (keys != null && keys.length() > 0) {
+                String[] keyset = keys.split(MessageConst.KEY_SEPARATOR);
+                for (int i = 0; i < keyset.length; i++) {
+                    String key = keyset[i];
+                    if (key.length() > 0) {
+                        indexFile = putKey(indexFile, msg, buildKey(topic, key));
+                        if (indexFile == null) {
+                            log.error("putKey error commitlog {} uniqkey {}", req.getCommitLogOffset(), req.getUniqKey());
+                            return;
+                        }
+                    }
+                }
+            }
+        } else {
+            log.error("build index error, stop building index");
+        }
+    }
+
+
+    private IndexFile putKey(IndexFile indexFile, DispatchRequest msg, String idxKey) {
+        for (boolean ok =
+             indexFile.putKey(idxKey, msg.getCommitLogOffset(),
+                     msg.getStoreTimestamp()); !ok; ) {
+            log.warn("index file full, so create another one, " + indexFile.getFileName());
+            indexFile = retryGetAndCreateIndexFile();
+            if (null == indexFile) {
+                return null;
+            }
+
+            ok =
+                    indexFile.putKey(idxKey, msg.getCommitLogOffset(),
+                            msg.getStoreTimestamp());
+        }
+        return indexFile;
+    }
+
+
+    public IndexFile retryGetAndCreateIndexFile() {
+        IndexFile indexFile = null;
+
+
+        for (int times = 0; null == indexFile && times < 3; times++) {
+            indexFile = this.getAndCreateLastIndexFile();
+            if (null != indexFile)
+                break;
+
+            try {
+                log.error("try to create index file, " + times + " times");
+                Thread.sleep(1000);
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            }
+        }
+
+
+        if (null == indexFile) {
+            this.defaultMessageStore.getAccessRights().makeIndexFileError();
+            log.error("mark index file can not build flag");
+        }
+
+        return indexFile;
+    }
+
+
+    public IndexFile getAndCreateLastIndexFile() {
+        IndexFile indexFile = null;
+        IndexFile prevIndexFile = null;
+        long lastUpdateEndPhyOffset = 0;
+        long lastUpdateIndexTimestamp = 0;
+
+        {
+            this.readWriteLock.readLock().lock();
+            if (!this.indexFileList.isEmpty()) {
+                IndexFile tmp = this.indexFileList.get(this.indexFileList.size() - 1);
+                if (!tmp.isWriteFull()) {
+                    indexFile = tmp;
+                } else {
+                    lastUpdateEndPhyOffset = tmp.getEndPhyOffset();
+                    lastUpdateIndexTimestamp = tmp.getEndTimestamp();
+                    prevIndexFile = tmp;
+                }
+            }
+
+            this.readWriteLock.readLock().unlock();
+        }
+
+
+        if (indexFile == null) {
+            try {
+                String fileName =
+                        this.storePath + File.separator
+                                + UtilAll.timeMillisToHumanString(System.currentTimeMillis());
+                indexFile =
+                        new IndexFile(fileName, this.hashSlotNum, this.indexNum, lastUpdateEndPhyOffset,
+                                lastUpdateIndexTimestamp);
+                this.readWriteLock.writeLock().lock();
+                this.indexFileList.add(indexFile);
+            } catch (Exception e) {
+                log.error("getLastIndexFile exception ", e);
+            } finally {
+                this.readWriteLock.writeLock().unlock();
+            }
+
+
+            if (indexFile != null) {
+                final IndexFile flushThisFile = prevIndexFile;
+                Thread flushThread = new Thread(new Runnable() {
+                    @Override
+                    public void run() {
+                        IndexService.this.flush(flushThisFile);
+                    }
+                }, "FlushIndexFileThread");
+
+                flushThread.setDaemon(true);
+                flushThread.start();
+            }
+        }
+
+        return indexFile;
+    }
+
+
+    public void flush(final IndexFile f) {
+        if (null == f)
+            return;
+
+        long indexMsgTimestamp = 0;
+
+        if (f.isWriteFull()) {
+            indexMsgTimestamp = f.getEndTimestamp();
+        }
+
+        f.flush();
+
+        if (indexMsgTimestamp > 0) {
+            this.defaultMessageStore.getStoreCheckpoint().setIndexMsgTimestamp(indexMsgTimestamp);
+            this.defaultMessageStore.getStoreCheckpoint().flush();
+        }
+    }
+
+
+    public void start() {
+
+    }
+
+
+    public void shutdown() {
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/index/QueryOffsetResult.java
----------------------------------------------------------------------
diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/index/QueryOffsetResult.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/index/QueryOffsetResult.java
new file mode 100644
index 0000000..89d0755
--- /dev/null
+++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/index/QueryOffsetResult.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.store.index;
+
+import java.util.List;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class QueryOffsetResult {
+    private final List<Long> phyOffsets;
+    private final long indexLastUpdateTimestamp;
+    private final long indexLastUpdatePhyoffset;
+
+
+    public QueryOffsetResult(List<Long> phyOffsets, long indexLastUpdateTimestamp,
+                             long indexLastUpdatePhyoffset) {
+        this.phyOffsets = phyOffsets;
+        this.indexLastUpdateTimestamp = indexLastUpdateTimestamp;
+        this.indexLastUpdatePhyoffset = indexLastUpdatePhyoffset;
+    }
+
+
+    public List<Long> getPhyOffsets() {
+        return phyOffsets;
+    }
+
+
+    public long getIndexLastUpdateTimestamp() {
+        return indexLastUpdateTimestamp;
+    }
+
+
+    public long getIndexLastUpdatePhyoffset() {
+        return indexLastUpdatePhyoffset;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/schedule/DelayOffsetSerializeWrapper.java
----------------------------------------------------------------------
diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/schedule/DelayOffsetSerializeWrapper.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/schedule/DelayOffsetSerializeWrapper.java
new file mode 100644
index 0000000..5f4720d
--- /dev/null
+++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/schedule/DelayOffsetSerializeWrapper.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.store.schedule;
+
+import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+
+/**
+ * @author shijia.wxr
+ *
+ */
+public class DelayOffsetSerializeWrapper extends RemotingSerializable {
+    private ConcurrentHashMap<Integer /* level */, Long/* offset */> offsetTable =
+            new ConcurrentHashMap<Integer, Long>(32);
+
+
+    public ConcurrentHashMap<Integer, Long> getOffsetTable() {
+        return offsetTable;
+    }
+
+
+    public void setOffsetTable(ConcurrentHashMap<Integer, Long> offsetTable) {
+        this.offsetTable = offsetTable;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/schedule/ScheduleMessageService.java
----------------------------------------------------------------------
diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/schedule/ScheduleMessageService.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/schedule/ScheduleMessageService.java
new file mode 100644
index 0000000..e243a7e
--- /dev/null
+++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/schedule/ScheduleMessageService.java
@@ -0,0 +1,386 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.store.schedule;
+
+import com.alibaba.rocketmq.common.ConfigManager;
+import com.alibaba.rocketmq.common.TopicFilterType;
+import com.alibaba.rocketmq.common.constant.LoggerName;
+import com.alibaba.rocketmq.common.message.MessageAccessor;
+import com.alibaba.rocketmq.common.message.MessageConst;
+import com.alibaba.rocketmq.common.message.MessageDecoder;
+import com.alibaba.rocketmq.common.message.MessageExt;
+import com.alibaba.rocketmq.common.running.RunningStats;
+import com.alibaba.rocketmq.store.*;
+import com.alibaba.rocketmq.store.config.StorePathConfigHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+/**
+ * @author shijia.wxr
+ *
+ */
+public class ScheduleMessageService extends ConfigManager {
+    public static final String SCHEDULE_TOPIC = "SCHEDULE_TOPIC_XXXX";
+    private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
+    private static final long FIRST_DELAY_TIME = 1000L;
+    private static final long DELAY_FOR_A_WHILE = 100L;
+    private static final long DELAY_FOR_A_PERIOD = 10000L;
+
+    private final ConcurrentHashMap<Integer /* level */, Long/* delay timeMillis */> delayLevelTable =
+            new ConcurrentHashMap<Integer, Long>(32);
+
+    private final ConcurrentHashMap<Integer /* level */, Long/* offset */> offsetTable =
+            new ConcurrentHashMap<Integer, Long>(32);
+
+    private final Timer timer = new Timer("ScheduleMessageTimerThread", true);
+
+    private final DefaultMessageStore defaultMessageStore;
+
+    private int maxDelayLevel;
+
+
+    public ScheduleMessageService(final DefaultMessageStore defaultMessageStore) {
+        this.defaultMessageStore = defaultMessageStore;
+    }
+
+    public static int queueId2DelayLevel(final int queueId) {
+        return queueId + 1;
+    }
+
+    public void buildRunningStats(HashMap<String, String> stats) {
+        Iterator<Entry<Integer, Long>> it = this.offsetTable.entrySet().iterator();
+        while (it.hasNext()) {
+            Entry<Integer, Long> next = it.next();
+            int queueId = delayLevel2QueueId(next.getKey());
+            long delayOffset = next.getValue();
+            long maxOffset = this.defaultMessageStore.getMaxOffsetInQuque(SCHEDULE_TOPIC, queueId);
+            String value = String.format("%d,%d", delayOffset, maxOffset);
+            String key = String.format("%s_%d", RunningStats.scheduleMessageOffset.name(), next.getKey());
+            stats.put(key, value);
+        }
+    }
+
+    public static int delayLevel2QueueId(final int delayLevel) {
+        return delayLevel - 1;
+    }
+
+
+    private void updateOffset(int delayLevel, long offset) {
+        this.offsetTable.put(delayLevel, offset);
+    }
+
+
+    public long computeDeliverTimestamp(final int delayLevel, final long storeTimestamp) {
+        Long time = this.delayLevelTable.get(delayLevel);
+        if (time != null) {
+            return time + storeTimestamp;
+        }
+
+        return storeTimestamp + 1000;
+    }
+
+
+    public void start() {
+
+        for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
+            Integer level = entry.getKey();
+            Long timeDelay = entry.getValue();
+            Long offset = this.offsetTable.get(level);
+            if (null == offset) {
+                offset = 0L;
+            }
+
+            if (timeDelay != null) {
+                this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
+            }
+        }
+
+        this.timer.scheduleAtFixedRate(new TimerTask() {
+
+            @Override
+            public void run() {
+                try {
+                    ScheduleMessageService.this.persist();
+                } catch (Exception e) {
+                    log.error("scheduleAtFixedRate flush exception", e);
+                }
+            }
+        }, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());
+    }
+
+
+    public void shutdown() {
+        this.timer.cancel();
+    }
+
+
+    public int getMaxDelayLevel() {
+        return maxDelayLevel;
+    }
+
+
+    public String encode() {
+        return this.encode(false);
+    }
+
+    public boolean load() {
+        boolean result = super.load();
+        result = result && this.parseDelayLevel();
+        return result;
+    }
+
+    @Override
+    public String configFilePath() {
+        return StorePathConfigHelper.getDelayOffsetStorePath(this.defaultMessageStore.getMessageStoreConfig()
+                .getStorePathRootDir());
+    }
+
+    @Override
+    public void decode(String jsonString) {
+        if (jsonString != null) {
+            DelayOffsetSerializeWrapper delayOffsetSerializeWrapper =
+                    DelayOffsetSerializeWrapper.fromJson(jsonString, DelayOffsetSerializeWrapper.class);
+            if (delayOffsetSerializeWrapper != null) {
+                this.offsetTable.putAll(delayOffsetSerializeWrapper.getOffsetTable());
+            }
+        }
+    }
+
+    public String encode(final boolean prettyFormat) {
+        DelayOffsetSerializeWrapper delayOffsetSerializeWrapper = new DelayOffsetSerializeWrapper();
+        delayOffsetSerializeWrapper.setOffsetTable(this.offsetTable);
+        return delayOffsetSerializeWrapper.toJson(prettyFormat);
+    }
+
+    public boolean parseDelayLevel() {
+        HashMap<String, Long> timeUnitTable = new HashMap<String, Long>();
+        timeUnitTable.put("s", 1000L);
+        timeUnitTable.put("m", 1000L * 60);
+        timeUnitTable.put("h", 1000L * 60 * 60);
+        timeUnitTable.put("d", 1000L * 60 * 60 * 24);
+
+        String levelString = this.defaultMessageStore.getMessageStoreConfig().getMessageDelayLevel();
+        try {
+            String[] levelArray = levelString.split(" ");
+            for (int i = 0; i < levelArray.length; i++) {
+                String value = levelArray[i];
+                String ch = value.substring(value.length() - 1);
+                Long tu = timeUnitTable.get(ch);
+
+                int level = i + 1;
+                if (level > this.maxDelayLevel) {
+                    this.maxDelayLevel = level;
+                }
+                long num = Long.parseLong(value.substring(0, value.length() - 1));
+                long delayTimeMillis = tu * num;
+                this.delayLevelTable.put(level, delayTimeMillis);
+            }
+        } catch (Exception e) {
+            log.error("parseDelayLevel exception", e);
+            log.info("levelString String = {}", levelString);
+            return false;
+        }
+
+        return true;
+    }
+
+    class DeliverDelayedMessageTimerTask extends TimerTask {
+        private final int delayLevel;
+        private final long offset;
+
+
+        public DeliverDelayedMessageTimerTask(int delayLevel, long offset) {
+            this.delayLevel = delayLevel;
+            this.offset = offset;
+        }
+
+
+        @Override
+        public void run() {
+            try {
+                this.executeOnTimeup();
+            } catch (Exception e) {
+                // XXX: warn and notify me
+                log.error("ScheduleMessageService, executeOnTimeup exception", e);
+                ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
+                        this.delayLevel, this.offset), DELAY_FOR_A_PERIOD);
+            }
+        }
+
+
+        /**
+
+         *
+         * @return
+         */
+        private long correctDeliverTimestamp(final long now, final long deliverTimestamp) {
+
+            long result = deliverTimestamp;
+
+            long maxTimestamp = now + ScheduleMessageService.this.delayLevelTable.get(this.delayLevel);
+            if (deliverTimestamp > maxTimestamp) {
+                result = now;
+            }
+
+            return result;
+        }
+
+
+        public void executeOnTimeup() {
+            ConsumeQueue cq =
+                    ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(SCHEDULE_TOPIC,
+                            delayLevel2QueueId(delayLevel));
+
+            long failScheduleOffset = offset;
+
+            if (cq != null) {
+                SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
+                if (bufferCQ != null) {
+                    try {
+                        long nextOffset = offset;
+                        int i = 0;
+                        for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
+                            long offsetPy = bufferCQ.getByteBuffer().getLong();
+                            int sizePy = bufferCQ.getByteBuffer().getInt();
+                            long tagsCode = bufferCQ.getByteBuffer().getLong();
+
+
+                            long now = System.currentTimeMillis();
+                            long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
+
+                            nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
+
+                            long countdown = deliverTimestamp - now;
+
+                            if (countdown <= 0) {
+                                MessageExt msgExt =
+                                        ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(
+                                                offsetPy, sizePy);
+
+                                if (msgExt != null) {
+                                    try {
+                                        MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
+                                        PutMessageResult putMessageResult =
+                                                ScheduleMessageService.this.defaultMessageStore
+                                                        .putMessage(msgInner);
+
+                                        if (putMessageResult != null
+                                                && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
+                                            continue;
+                                        }
+
+                                        else {
+                                            // XXX: warn and notify me
+                                            log.error(
+                                                    "ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}",
+                                                    msgExt.getTopic(), msgExt.getMsgId());
+                                            ScheduleMessageService.this.timer.schedule(
+                                                    new DeliverDelayedMessageTimerTask(this.delayLevel,
+                                                            nextOffset), DELAY_FOR_A_PERIOD);
+                                            ScheduleMessageService.this.updateOffset(this.delayLevel,
+                                                    nextOffset);
+                                            return;
+                                        }
+                                    } catch (Exception e) {
+                                        /*
+                                         * XXX: warn and notify me
+
+
+
+                                         */
+                                        log.error(
+                                                "ScheduleMessageService, messageTimeup execute error, drop it. msgExt="
+                                                        + msgExt + ", nextOffset=" + nextOffset + ",offsetPy="
+                                                        + offsetPy + ",sizePy=" + sizePy, e);
+                                    }
+                                }
+                            }
+
+                            else {
+                                ScheduleMessageService.this.timer.schedule(
+                                        new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset),
+                                        countdown);
+                                ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
+                                return;
+                            }
+                        } // end of for
+
+                        nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
+                        ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
+                                this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);
+                        ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
+                        return;
+                    } finally {
+
+                        bufferCQ.release();
+                    }
+                } // end of if (bufferCQ != null)
+                else {
+                    /*
+
+
+                     */
+                    long cqMinOffset = cq.getMinOffsetInQuque();
+                    if (offset < cqMinOffset) {
+                        failScheduleOffset = cqMinOffset;
+                        log.error("schedule CQ offset invalid. offset=" + offset + ", cqMinOffset="
+                                + cqMinOffset + ", queueId=" + cq.getQueueId());
+                    }
+                }
+            } // end of if (cq != null)
+
+            ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,
+                    failScheduleOffset), DELAY_FOR_A_WHILE);
+        }
+
+
+        private MessageExtBrokerInner messageTimeup(MessageExt msgExt) {
+            MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
+            msgInner.setBody(msgExt.getBody());
+            msgInner.setFlag(msgExt.getFlag());
+            MessageAccessor.setProperties(msgInner, msgExt.getProperties());
+
+            TopicFilterType topicFilterType = MessageExt.parseTopicFilterType(msgInner.getSysFlag());
+            long tagsCodeValue =
+                    MessageExtBrokerInner.tagsString2tagsCode(topicFilterType, msgInner.getTags());
+            msgInner.setTagsCode(tagsCodeValue);
+            msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
+
+            msgInner.setSysFlag(msgExt.getSysFlag());
+            msgInner.setBornTimestamp(msgExt.getBornTimestamp());
+            msgInner.setBornHost(msgExt.getBornHost());
+            msgInner.setStoreHost(msgExt.getStoreHost());
+            msgInner.setReconsumeTimes(msgExt.getReconsumeTimes());
+
+            msgInner.setWaitStoreMsgOK(false);
+            MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
+
+            msgInner.setTopic(msgInner.getProperty(MessageConst.PROPERTY_REAL_TOPIC));
+
+            String queueIdStr = msgInner.getProperty(MessageConst.PROPERTY_REAL_QUEUE_ID);
+            int queueId = Integer.parseInt(queueIdStr);
+            msgInner.setQueueId(queueId);
+
+            return msgInner;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/stats/BrokerStats.java
----------------------------------------------------------------------
diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/stats/BrokerStats.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/stats/BrokerStats.java
new file mode 100644
index 0000000..539d4be
--- /dev/null
+++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/stats/BrokerStats.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.store.stats;
+
+import com.alibaba.rocketmq.common.constant.LoggerName;
+import com.alibaba.rocketmq.store.DefaultMessageStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class BrokerStats {
+    private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+    private final DefaultMessageStore defaultMessageStore;
+
+    private volatile long msgPutTotalYesterdayMorning;
+
+    private volatile long msgPutTotalTodayMorning;
+
+    private volatile long msgGetTotalYesterdayMorning;
+
+    private volatile long msgGetTotalTodayMorning;
+
+
+    public BrokerStats(DefaultMessageStore defaultMessageStore) {
+        this.defaultMessageStore = defaultMessageStore;
+    }
+
+
+    /**
+
+     */
+    public void record() {
+        this.msgPutTotalYesterdayMorning = this.msgPutTotalTodayMorning;
+        this.msgGetTotalYesterdayMorning = this.msgGetTotalTodayMorning;
+
+        this.msgPutTotalTodayMorning =
+                this.defaultMessageStore.getStoreStatsService().getPutMessageTimesTotal();
+        this.msgGetTotalTodayMorning =
+                this.defaultMessageStore.getStoreStatsService().getGetMessageTransferedMsgCount().get();
+
+        log.info("yesterday put message total: {}", msgPutTotalTodayMorning - msgPutTotalYesterdayMorning);
+        log.info("yesterday get message total: {}", msgGetTotalTodayMorning - msgGetTotalYesterdayMorning);
+    }
+
+
+    public long getMsgPutTotalYesterdayMorning() {
+        return msgPutTotalYesterdayMorning;
+    }
+
+
+    public void setMsgPutTotalYesterdayMorning(long msgPutTotalYesterdayMorning) {
+        this.msgPutTotalYesterdayMorning = msgPutTotalYesterdayMorning;
+    }
+
+
+    public long getMsgPutTotalTodayMorning() {
+        return msgPutTotalTodayMorning;
+    }
+
+
+    public void setMsgPutTotalTodayMorning(long msgPutTotalTodayMorning) {
+        this.msgPutTotalTodayMorning = msgPutTotalTodayMorning;
+    }
+
+
+    public long getMsgGetTotalYesterdayMorning() {
+        return msgGetTotalYesterdayMorning;
+    }
+
+
+    public void setMsgGetTotalYesterdayMorning(long msgGetTotalYesterdayMorning) {
+        this.msgGetTotalYesterdayMorning = msgGetTotalYesterdayMorning;
+    }
+
+
+    public long getMsgGetTotalTodayMorning() {
+        return msgGetTotalTodayMorning;
+    }
+
+
+    public void setMsgGetTotalTodayMorning(long msgGetTotalTodayMorning) {
+        this.msgGetTotalTodayMorning = msgGetTotalTodayMorning;
+    }
+
+
+    public long getMsgPutTotalTodayNow() {
+        return this.defaultMessageStore.getStoreStatsService().getPutMessageTimesTotal();
+    }
+
+
+    public long getMsgGetTotalTodayNow() {
+        return this.defaultMessageStore.getStoreStatsService().getGetMessageTransferedMsgCount().get();
+    }
+}