You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2016/12/19 09:40:23 UTC

[06/43] incubator-rocketmq git commit: Finish code dump. Reviewed by: @yukon @vongosling @stevenschew @vintagewang @lollipop @zander

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/stats/BrokerStatsManager.java
----------------------------------------------------------------------
diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/stats/BrokerStatsManager.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/stats/BrokerStatsManager.java
new file mode 100644
index 0000000..60b3fff
--- /dev/null
+++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/stats/BrokerStatsManager.java
@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.store.stats;
+
+import com.alibaba.rocketmq.common.ThreadFactoryImpl;
+import com.alibaba.rocketmq.common.constant.LoggerName;
+import com.alibaba.rocketmq.common.stats.MomentStatsItemSet;
+import com.alibaba.rocketmq.common.stats.StatsItem;
+import com.alibaba.rocketmq.common.stats.StatsItemSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+
+public class BrokerStatsManager {
+
+    public static final String TOPIC_PUT_NUMS = "TOPIC_PUT_NUMS";
+    public static final String TOPIC_PUT_SIZE = "TOPIC_PUT_SIZE";
+    public static final String GROUP_GET_NUMS = "GROUP_GET_NUMS";
+    public static final String GROUP_GET_SIZE = "GROUP_GET_SIZE";
+    public static final String SNDBCK_PUT_NUMS = "SNDBCK_PUT_NUMS";
+    public static final String BROKER_PUT_NUMS = "BROKER_PUT_NUMS";
+    public static final String BROKER_GET_NUMS = "BROKER_GET_NUMS";
+    public static final String GROUP_GET_FROM_DISK_NUMS = "GROUP_GET_FROM_DISK_NUMS";
+    public static final String GROUP_GET_FROM_DISK_SIZE = "GROUP_GET_FROM_DISK_SIZE";
+    public static final String BROKER_GET_FROM_DISK_NUMS = "BROKER_GET_FROM_DISK_NUMS";
+    public static final String BROKER_GET_FROM_DISK_SIZE = "BROKER_GET_FROM_DISK_SIZE";
+    // For commercial
+    public static final String COMMERCIAL_SEND_TIMES = "COMMERCIAL_SEND_TIMES";
+    public static final String COMMERCIAL_SNDBCK_TIMES = "COMMERCIAL_SNDBCK_TIMES";
+    public static final String COMMERCIAL_RCV_TIMES = "COMMERCIAL_RCV_TIMES";
+    public static final String COMMERCIAL_RCV_EPOLLS = "COMMERCIAL_RCV_EPOLLS";
+    public static final String COMMERCIAL_SEND_SIZE = "COMMERCIAL_SEND_SIZE";
+    public static final String COMMERCIAL_RCV_SIZE = "COMMERCIAL_RCV_SIZE";
+    public static final String COMMERCIAL_PERM_FAILURES = "COMMERCIAL_PERM_FAILURES";
+    public static final String COMMERCIAL_OWNER = "Owner";
+    // Message Size limit for one api-calling count.
+    public static final double SIZE_PER_COUNT = 64 * 1024;
+
+    public static final String GROUP_GET_FALL_SIZE = "GROUP_GET_FALL_SIZE";
+    public static final String GROUP_GET_FALL_TIME = "GROUP_GET_FALL_TIME";
+    // Pull Message Latency
+    public static final String GROUP_GET_LATENCY = "GROUP_GET_LATENCY";
+
+    /**
+     * read disk follow stats
+     */
+    private static final Logger log = LoggerFactory.getLogger(LoggerName.ROCKETMQ_STATS_LOGGER_NAME);
+    private static final Logger COMMERCIAL_LOG = LoggerFactory.getLogger(LoggerName.COMMERCIAL_LOGGER_NAME);
+    private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
+            "BrokerStatsThread"));
+    private final ScheduledExecutorService commercialExecutor = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
+            "CommercialStatsThread"));
+    private final HashMap<String, StatsItemSet> statsTable = new HashMap<String, StatsItemSet>();
+    private final String clusterName;
+    private final MomentStatsItemSet momentStatsItemSetFallSize = new MomentStatsItemSet(GROUP_GET_FALL_SIZE, scheduledExecutorService, log);
+    private final MomentStatsItemSet momentStatsItemSetFallTime = new MomentStatsItemSet(GROUP_GET_FALL_TIME, scheduledExecutorService, log);
+
+    public BrokerStatsManager(String clusterName) {
+        this.clusterName = clusterName;
+
+        this.statsTable.put(TOPIC_PUT_NUMS, new StatsItemSet(TOPIC_PUT_NUMS, this.scheduledExecutorService, log));
+        this.statsTable.put(TOPIC_PUT_SIZE, new StatsItemSet(TOPIC_PUT_SIZE, this.scheduledExecutorService, log));
+        this.statsTable.put(GROUP_GET_NUMS, new StatsItemSet(GROUP_GET_NUMS, this.scheduledExecutorService, log));
+        this.statsTable.put(GROUP_GET_SIZE, new StatsItemSet(GROUP_GET_SIZE, this.scheduledExecutorService, log));
+        this.statsTable.put(GROUP_GET_LATENCY, new StatsItemSet(GROUP_GET_LATENCY, this.scheduledExecutorService, log));
+        this.statsTable.put(SNDBCK_PUT_NUMS, new StatsItemSet(SNDBCK_PUT_NUMS, this.scheduledExecutorService, log));
+        this.statsTable.put(BROKER_PUT_NUMS, new StatsItemSet(BROKER_PUT_NUMS, this.scheduledExecutorService, log));
+        this.statsTable.put(BROKER_GET_NUMS, new StatsItemSet(BROKER_GET_NUMS, this.scheduledExecutorService, log));
+        this.statsTable.put(GROUP_GET_FROM_DISK_NUMS, new StatsItemSet(GROUP_GET_FROM_DISK_NUMS, this.scheduledExecutorService, log));
+        this.statsTable.put(GROUP_GET_FROM_DISK_SIZE, new StatsItemSet(GROUP_GET_FROM_DISK_SIZE, this.scheduledExecutorService, log));
+        this.statsTable.put(BROKER_GET_FROM_DISK_NUMS, new StatsItemSet(BROKER_GET_FROM_DISK_NUMS, this.scheduledExecutorService, log));
+        this.statsTable.put(BROKER_GET_FROM_DISK_SIZE, new StatsItemSet(BROKER_GET_FROM_DISK_SIZE, this.scheduledExecutorService, log));
+
+
+        this.statsTable.put(COMMERCIAL_SEND_TIMES, new StatsItemSet(COMMERCIAL_SEND_TIMES, this.commercialExecutor, COMMERCIAL_LOG));
+        this.statsTable.put(COMMERCIAL_RCV_TIMES, new StatsItemSet(COMMERCIAL_RCV_TIMES, this.commercialExecutor, COMMERCIAL_LOG));
+        this.statsTable.put(COMMERCIAL_SEND_SIZE, new StatsItemSet(COMMERCIAL_SEND_SIZE, this.commercialExecutor, COMMERCIAL_LOG));
+        this.statsTable.put(COMMERCIAL_RCV_SIZE, new StatsItemSet(COMMERCIAL_RCV_SIZE, this.commercialExecutor, COMMERCIAL_LOG));
+        this.statsTable.put(COMMERCIAL_RCV_EPOLLS, new StatsItemSet(COMMERCIAL_RCV_EPOLLS, this.commercialExecutor, COMMERCIAL_LOG));
+        this.statsTable.put(COMMERCIAL_SNDBCK_TIMES, new StatsItemSet(COMMERCIAL_SNDBCK_TIMES, this.commercialExecutor, COMMERCIAL_LOG));
+        this.statsTable.put(COMMERCIAL_PERM_FAILURES, new StatsItemSet(COMMERCIAL_PERM_FAILURES, this.commercialExecutor, COMMERCIAL_LOG));
+    }
+
+    public MomentStatsItemSet getMomentStatsItemSetFallSize() {
+        return momentStatsItemSetFallSize;
+    }
+
+    public MomentStatsItemSet getMomentStatsItemSetFallTime() {
+        return momentStatsItemSetFallTime;
+    }
+
+    public void start() {
+    }
+
+    public void shutdown() {
+        this.scheduledExecutorService.shutdown();
+    }
+
+    public StatsItem getStatsItem(final String statsName, final String statsKey) {
+        try {
+            return this.statsTable.get(statsName).getStatsItem(statsKey);
+        } catch (Exception e) {
+        }
+
+        return null;
+    }
+
+    public void incTopicPutNums(final String topic) {
+        this.statsTable.get(TOPIC_PUT_NUMS).addValue(topic, 1, 1);
+    }
+
+    public void incTopicPutSize(final String topic, final int size) {
+        this.statsTable.get(TOPIC_PUT_SIZE).addValue(topic, size, 1);
+    }
+
+    public void incGroupGetNums(final String group, final String topic, final int incValue) {
+        final String statsKey = buildStatsKey(topic, group);
+        this.statsTable.get(GROUP_GET_NUMS).addValue(statsKey, incValue, 1);
+    }
+
+    public String buildStatsKey(String topic, String group) {
+        StringBuffer strBuilder = new StringBuffer();
+        strBuilder.append(topic);
+        strBuilder.append("@");
+        strBuilder.append(group);
+        return strBuilder.toString();
+    }
+
+    public void incGroupGetSize(final String group, final String topic, final int incValue) {
+        final String statsKey = buildStatsKey(topic, group);
+        this.statsTable.get(GROUP_GET_SIZE).addValue(statsKey, incValue, 1);
+    }
+
+    public void incGroupGetLatency(final String group, final String topic, final int queueId, final int incValue) {
+        final String statsKey = String.format("%d@%s@%s", queueId, topic, group);
+        this.statsTable.get(GROUP_GET_LATENCY).addValue(statsKey, incValue, 1);
+    }
+
+
+    public void incBrokerPutNums() {
+        this.statsTable.get(BROKER_PUT_NUMS).getAndCreateStatsItem(this.clusterName).getValue().incrementAndGet();
+    }
+
+
+    public void incBrokerGetNums(final int incValue) {
+        this.statsTable.get(BROKER_GET_NUMS).getAndCreateStatsItem(this.clusterName).getValue().addAndGet(incValue);
+    }
+
+
+    public void incSendBackNums(final String group, final String topic) {
+        final String statsKey = buildStatsKey(topic, group);
+        this.statsTable.get(SNDBCK_PUT_NUMS).addValue(statsKey, 1, 1);
+    }
+
+
+    public double tpsGroupGetNums(final String group, final String topic) {
+        final String statsKey = buildStatsKey(topic, group);
+        return this.statsTable.get(GROUP_GET_NUMS).getStatsDataInMinute(statsKey).getTps();
+    }
+
+
+    public void recordDiskFallBehindTime(final String group, final String topic, final int queueId, final long fallBehind) {
+        final String statsKey = String.format("%d@%s@%s", queueId, topic, group);
+        this.momentStatsItemSetFallTime.getAndCreateStatsItem(statsKey).getValue().set(fallBehind);
+    }
+
+
+    public void recordDiskFallBehindSize(final String group, final String topic, final int queueId, final long fallBehind) {
+        final String statsKey = String.format("%d@%s@%s", queueId, topic, group);
+        this.momentStatsItemSetFallSize.getAndCreateStatsItem(statsKey).getValue().set(fallBehind);
+    }
+
+    public void incCommercialValue(final String key, final String owner, final String group,
+                                   final String topic, final String type, final int incValue) {
+        final String statsKey = buildCommercialStatsKey(owner, topic, group, type);
+        this.statsTable.get(key).addValue(statsKey, incValue, 1);
+    }
+
+    public String buildCommercialStatsKey(String owner, String topic, String group, String type) {
+        StringBuffer strBuilder = new StringBuffer();
+        strBuilder.append(owner);
+        strBuilder.append("@");
+        strBuilder.append(topic);
+        strBuilder.append("@");
+        strBuilder.append(group);
+        strBuilder.append("@");
+        strBuilder.append(type);
+        return strBuilder.toString();
+    }
+
+
+    public enum StatsType {
+        SEND_SUCCESS,
+        SEND_FAILURE,
+        SEND_BACK,
+        SEND_TIMER,
+        SEND_TRANSACTION,
+        RCV_SUCCESS,
+        RCV_EPOLLS,
+        PERM_FAILURE
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/util/LibC.java
----------------------------------------------------------------------
diff --git a/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/util/LibC.java b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/util/LibC.java
new file mode 100644
index 0000000..7a3b37c
--- /dev/null
+++ b/rocketmq-store/src/main/java/com/alibaba/rocketmq/store/util/LibC.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.store.util;
+
+import com.sun.jna.Library;
+import com.sun.jna.Native;
+import com.sun.jna.NativeLong;
+import com.sun.jna.Platform;
+import com.sun.jna.Pointer;
+
+
+public interface LibC extends Library {
+    LibC INSTANCE = (LibC) Native.loadLibrary(Platform.isWindows() ? "msvcrt" : "c", LibC.class);
+
+    int MADV_WILLNEED = 3;
+    int MADV_DONTNEED = 4;
+
+    int MCL_CURRENT = 1;
+    int MCL_FUTURE = 2;
+    int MCL_ONFAULT = 4;
+
+    /* sync memory asynchronously */
+    int MS_ASYNC = 0x0001;
+    /* invalidate mappings & caches */
+    int MS_INVALIDATE = 0x0002;
+    /* synchronous memory sync */
+    int MS_SYNC = 0x0004;
+
+    int mlock(Pointer var1, NativeLong var2);
+
+    int munlock(Pointer var1, NativeLong var2);
+
+    int madvise(Pointer var1, NativeLong var2, int var3);
+
+    Pointer memset(Pointer p, int v, long len);
+
+    int mlockall(int flags);
+
+    int msync(Pointer p, NativeLong length, int flags);
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/DefaultMessageStoreTest.java
----------------------------------------------------------------------
diff --git a/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/DefaultMessageStoreTest.java b/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/DefaultMessageStoreTest.java
new file mode 100644
index 0000000..ac577c7
--- /dev/null
+++ b/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/DefaultMessageStoreTest.java
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package com.alibaba.rocketmq.store;
+
+import com.alibaba.rocketmq.store.config.FlushDiskType;
+import com.alibaba.rocketmq.store.config.MessageStoreConfig;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.Assert.assertTrue;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class DefaultMessageStoreTest {
+    private static final String StoreMessage = "Once, there was a chance for me!";
+
+    private static int QUEUE_TOTAL = 100;
+
+    private static AtomicInteger QueueId = new AtomicInteger(0);
+
+    private static SocketAddress BornHost;
+
+    private static SocketAddress StoreHost;
+
+    private static byte[] MessageBody;
+
+    @BeforeClass
+    public static void setUpBeforeClass() throws Exception {
+        StoreHost = new InetSocketAddress(InetAddress.getLocalHost(), 8123);
+        BornHost = new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 0);
+    }
+
+    @AfterClass
+    public static void tearDownAfterClass() throws Exception {
+    }
+
+    @Test
+    public void test_write_read() throws Exception {
+        System.out.println("================================================================");
+        long totalMsgs = 100;
+        QUEUE_TOTAL = 1;
+        MessageBody = StoreMessage.getBytes();
+
+        MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
+        messageStoreConfig.setMapedFileSizeCommitLog(1024 * 8);
+        messageStoreConfig.setMapedFileSizeConsumeQueue(1024 * 4);
+        messageStoreConfig.setMaxHashSlotNum(100);
+        messageStoreConfig.setMaxIndexNum(100 * 10);
+        MessageStore master = new DefaultMessageStore(messageStoreConfig, null, null, null);
+
+        boolean load = master.load();
+        assertTrue(load);
+
+        master.start();
+        for (long i = 0; i < totalMsgs; i++) {
+            PutMessageResult result = master.putMessage(buildMessage());
+            System.out.println(i + "\t" + result.getAppendMessageResult().getMsgId());
+        }
+
+        for (long i = 0; i < totalMsgs; i++) {
+            try {
+                GetMessageResult result = master.getMessage("GROUP_A", "TOPIC_A", 0, i, 1024 * 1024, null);
+                if (result == null) {
+                    System.out.println("result == null " + i);
+                }
+                assertTrue(result != null);
+                result.release();
+                System.out.println("read " + i + " OK");
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+
+        }
+        master.shutdown();
+        master.destroy();
+        System.out.println("================================================================");
+    }
+
+    public MessageExtBrokerInner buildMessage() {
+        MessageExtBrokerInner msg = new MessageExtBrokerInner();
+        msg.setTopic("AAA");
+        msg.setTags("TAG1");
+        msg.setKeys("Hello");
+        msg.setBody(MessageBody);
+        msg.setKeys(String.valueOf(System.currentTimeMillis()));
+        msg.setQueueId(Math.abs(QueueId.getAndIncrement()) % QUEUE_TOTAL);
+        msg.setSysFlag(4);
+        msg.setBornTimestamp(System.currentTimeMillis());
+        msg.setStoreHost(StoreHost);
+        msg.setBornHost(BornHost);
+        return msg;
+    }
+
+    @Test
+    public void test_group_commit() throws Exception {
+        System.out.println("================================================================");
+        long totalMsgs = 100;
+        QUEUE_TOTAL = 1;
+        MessageBody = StoreMessage.getBytes();
+        MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
+        messageStoreConfig.setMapedFileSizeCommitLog(1024 * 8);
+        messageStoreConfig.setFlushDiskType(FlushDiskType.SYNC_FLUSH);
+        MessageStore master = new DefaultMessageStore(messageStoreConfig, null, null, null);
+        boolean load = master.load();
+        assertTrue(load);
+
+        master.start();
+        for (long i = 0; i < totalMsgs; i++) {
+            PutMessageResult result = master.putMessage(buildMessage());
+            System.out.println(i + "\t" + result.getAppendMessageResult().getMsgId());
+        }
+
+        for (long i = 0; i < totalMsgs; i++) {
+            try {
+                GetMessageResult result = master.getMessage("GROUP_A", "TOPIC_A", 0, i, 1024 * 1024, null);
+                if (result == null) {
+                    System.out.println("result == null " + i);
+                }
+                assertTrue(result != null);
+                result.release();
+                System.out.println("read " + i + " OK");
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+
+        }
+        master.shutdown();
+        master.destroy();
+        System.out.println("================================================================");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/MappedFileQueueTest.java
----------------------------------------------------------------------
diff --git a/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/MappedFileQueueTest.java b/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/MappedFileQueueTest.java
new file mode 100644
index 0000000..ce7666f
--- /dev/null
+++ b/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/MappedFileQueueTest.java
@@ -0,0 +1,197 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+/**
+ * $Id: MappedFileQueueTest.java 1831 2013-05-16 01:39:51Z shijia.wxr $
+ */
+package com.alibaba.rocketmq.store;
+
+import org.junit.*;
+
+import static org.junit.Assert.*;
+
+
+public class MappedFileQueueTest {
+
+    // private static final String StoreMessage =
+    // "Once, there was a chance for me! but I did not treasure it. if";
+
+    @BeforeClass
+    public static void setUpBeforeClass() throws Exception {
+    }
+
+    @AfterClass
+    public static void tearDownAfterClass() throws Exception {
+    }
+
+    @Before
+    public void setUp() throws Exception {
+    }
+
+    @After
+    public void tearDown() throws Exception {
+    }
+
+    @Test
+    public void test_getLastMapedFile() {
+        final String fixedMsg = "0123456789abcdef";
+        System.out.println("================================================================");
+        MappedFileQueue mappedFileQueue =
+                new MappedFileQueue("target/unit_test_store/a/", 1024, null);
+
+        for (int i = 0; i < 1024; i++) {
+            MappedFile mappedFile = mappedFileQueue.getLastMappedFile(0);
+            assertTrue(mappedFile != null);
+            boolean result = mappedFile.appendMessage(fixedMsg.getBytes());
+            if (!result) {
+                System.out.println("appendMessage " + i);
+            }
+            assertTrue(result);
+        }
+
+        mappedFileQueue.shutdown(1000);
+        mappedFileQueue.destroy();
+        System.out.println("MappedFileQueue.getLastMappedFile() OK");
+    }
+
+
+    @Test
+    public void test_findMapedFileByOffset() {
+        final String fixedMsg = "abcd";
+        System.out.println("================================================================");
+        MappedFileQueue mappedFileQueue =
+                new MappedFileQueue("target/unit_test_store/b/", 1024, null);
+
+        for (int i = 0; i < 1024; i++) {
+            MappedFile mappedFile = mappedFileQueue.getLastMappedFile(0);
+            assertTrue(mappedFile != null);
+            boolean result = mappedFile.appendMessage(fixedMsg.getBytes());
+            // System.out.println("appendMessage " + bytes);
+            assertTrue(result);
+        }
+
+        MappedFile mappedFile = mappedFileQueue.findMappedFileByOffset(0);
+        assertTrue(mappedFile != null);
+        assertEquals(mappedFile.getFileFromOffset(), 0);
+        System.out.println(mappedFile.getFileFromOffset());
+
+        mappedFile = mappedFileQueue.findMappedFileByOffset(100);
+        assertTrue(mappedFile != null);
+        assertEquals(mappedFile.getFileFromOffset(), 0);
+        System.out.println(mappedFile.getFileFromOffset());
+
+        mappedFile = mappedFileQueue.findMappedFileByOffset(1024);
+        assertTrue(mappedFile != null);
+        assertEquals(mappedFile.getFileFromOffset(), 1024);
+        System.out.println(mappedFile.getFileFromOffset());
+
+        mappedFile = mappedFileQueue.findMappedFileByOffset(1024 + 100);
+        assertTrue(mappedFile != null);
+        assertEquals(mappedFile.getFileFromOffset(), 1024);
+        System.out.println(mappedFile.getFileFromOffset());
+
+        mappedFile = mappedFileQueue.findMappedFileByOffset(1024 * 2);
+        assertTrue(mappedFile != null);
+        assertEquals(mappedFile.getFileFromOffset(), 1024 * 2);
+        System.out.println(mappedFile.getFileFromOffset());
+
+        mappedFile = mappedFileQueue.findMappedFileByOffset(1024 * 2 + 100);
+        assertTrue(mappedFile != null);
+        assertEquals(mappedFile.getFileFromOffset(), 1024 * 2);
+        System.out.println(mappedFile.getFileFromOffset());
+
+        mappedFile = mappedFileQueue.findMappedFileByOffset(1024 * 4);
+        assertTrue(mappedFile == null);
+
+        mappedFile = mappedFileQueue.findMappedFileByOffset(1024 * 4 + 100);
+        assertTrue(mappedFile == null);
+
+        mappedFileQueue.shutdown(1000);
+        mappedFileQueue.destroy();
+        System.out.println("MappedFileQueue.findMappedFileByOffset() OK");
+    }
+
+    @Test
+    public void test_commit() {
+        final String fixedMsg = "0123456789abcdef";
+        System.out.println("================================================================");
+        MappedFileQueue mappedFileQueue =
+                new MappedFileQueue("target/unit_test_store/c/", 1024, null);
+
+        for (int i = 0; i < 1024; i++) {
+            MappedFile mappedFile = mappedFileQueue.getLastMappedFile(0);
+            assertTrue(mappedFile != null);
+            boolean result = mappedFile.appendMessage(fixedMsg.getBytes());
+            assertTrue(result);
+        }
+
+        boolean result = mappedFileQueue.flush(0);
+        assertFalse(result);
+        assertEquals(1024 * 1, mappedFileQueue.getFlushedWhere());
+        System.out.println("1 " + result + " " + mappedFileQueue.getFlushedWhere());
+
+        result = mappedFileQueue.flush(0);
+        assertFalse(result);
+        assertEquals(1024 * 2, mappedFileQueue.getFlushedWhere());
+        System.out.println("2 " + result + " " + mappedFileQueue.getFlushedWhere());
+
+        result = mappedFileQueue.flush(0);
+        assertFalse(result);
+        assertEquals(1024 * 3, mappedFileQueue.getFlushedWhere());
+        System.out.println("3 " + result + " " + mappedFileQueue.getFlushedWhere());
+
+        result = mappedFileQueue.flush(0);
+        assertFalse(result);
+        assertEquals(1024 * 4, mappedFileQueue.getFlushedWhere());
+        System.out.println("4 " + result + " " + mappedFileQueue.getFlushedWhere());
+
+        result = mappedFileQueue.flush(0);
+        assertFalse(result);
+        assertEquals(1024 * 5, mappedFileQueue.getFlushedWhere());
+        System.out.println("5 " + result + " " + mappedFileQueue.getFlushedWhere());
+
+        result = mappedFileQueue.flush(0);
+        assertFalse(result);
+        assertEquals(1024 * 6, mappedFileQueue.getFlushedWhere());
+        System.out.println("6 " + result + " " + mappedFileQueue.getFlushedWhere());
+
+        mappedFileQueue.shutdown(1000);
+        mappedFileQueue.destroy();
+        System.out.println("MappedFileQueue.flush() OK");
+    }
+
+    @Test
+    public void test_getMapedMemorySize() {
+        final String fixedMsg = "abcd";
+        System.out.println("================================================================");
+        MappedFileQueue mappedFileQueue =
+                new MappedFileQueue("target/unit_test_store/d/", 1024, null);
+
+        for (int i = 0; i < 1024; i++) {
+            MappedFile mappedFile = mappedFileQueue.getLastMappedFile(0);
+            assertTrue(mappedFile != null);
+            boolean result = mappedFile.appendMessage(fixedMsg.getBytes());
+            assertTrue(result);
+        }
+
+        assertEquals(fixedMsg.length() * 1024, mappedFileQueue.getMappedMemorySize());
+        mappedFileQueue.shutdown(1000);
+        mappedFileQueue.destroy();
+        System.out.println("MappedFileQueue.getMappedMemorySize() OK");
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/MappedFileTest.java
----------------------------------------------------------------------
diff --git a/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/MappedFileTest.java b/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/MappedFileTest.java
new file mode 100644
index 0000000..94fd5ee
--- /dev/null
+++ b/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/MappedFileTest.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+/**
+ * $Id: MappedFileTest.java 1831 2013-05-16 01:39:51Z shijia.wxr $
+ */
+package com.alibaba.rocketmq.store;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertTrue;
+
+
+public class MappedFileTest {
+
+    private static final String StoreMessage = "Once, there was a chance for me!";
+
+    @BeforeClass
+    public static void setUpBeforeClass() throws Exception {
+
+    }
+
+    @AfterClass
+    public static void tearDownAfterClass() throws Exception {
+    }
+
+    @Test
+    public void test_write_read() {
+        try {
+            MappedFile mappedFile = new MappedFile("target/unit_test_store/MappedFileTest/000", 1024 * 64);
+            boolean result = mappedFile.appendMessage(StoreMessage.getBytes());
+            assertTrue(result);
+            System.out.println("write OK");
+
+            SelectMappedBufferResult selectMappedBufferResult = mappedFile.selectMappedBuffer(0);
+            byte[] data = new byte[StoreMessage.length()];
+            selectMappedBufferResult.getByteBuffer().get(data);
+            String readString = new String(data);
+
+            System.out.println("Read: " + readString);
+            assertTrue(readString.equals(StoreMessage));
+
+            mappedFile.shutdown(1000);
+            assertTrue(!mappedFile.isAvailable());
+            selectMappedBufferResult.release();
+            assertTrue(mappedFile.isCleanupOver());
+            assertTrue(mappedFile.destroy(1000));
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+    }
+
+    @Ignore
+    public void test_jvm_crashed() {
+        try {
+            MappedFile mappedFile = new MappedFile("target/unit_test_store/MappedFileTest/10086", 1024 * 64);
+            boolean result = mappedFile.appendMessage(StoreMessage.getBytes());
+            assertTrue(result);
+            System.out.println("write OK");
+
+            SelectMappedBufferResult selectMappedBufferResult = mappedFile.selectMappedBuffer(0);
+            selectMappedBufferResult.release();
+            mappedFile.shutdown(1000);
+
+            byte[] data = new byte[StoreMessage.length()];
+            selectMappedBufferResult.getByteBuffer().get(data);
+            String readString = new String(data);
+            System.out.println(readString);
+        } catch (IOException e) {
+            // TODO Auto-generated catch block
+            e.printStackTrace();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/RecoverTest.java
----------------------------------------------------------------------
diff --git a/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/RecoverTest.java b/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/RecoverTest.java
new file mode 100644
index 0000000..ea83375
--- /dev/null
+++ b/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/RecoverTest.java
@@ -0,0 +1,212 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+/**
+ * $Id: RecoverTest.java 1831 2013-05-16 01:39:51Z shijia.wxr $
+ */
+package com.alibaba.rocketmq.store;
+
+import com.alibaba.rocketmq.common.message.MessageDecoder;
+import com.alibaba.rocketmq.common.message.MessageExt;
+import com.alibaba.rocketmq.store.config.MessageStoreConfig;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.Assert.assertTrue;
+
+
+public class RecoverTest {
+    private static final String StoreMessage = "Once, there was a chance for me!aaaaaaaaaaaaaaaaaaaaaaaa";
+
+    private static int QUEUE_TOTAL = 10;
+
+    private static AtomicInteger QueueId = new AtomicInteger(0);
+
+    private static SocketAddress BornHost;
+
+    private static SocketAddress StoreHost;
+
+    private static byte[] MessageBody;
+    private MessageStore storeWrite1;
+    private MessageStore storeWrite2;
+    private MessageStore storeRead;
+
+    @BeforeClass
+    public static void setUpBeforeClass() throws Exception {
+        StoreHost = new InetSocketAddress(InetAddress.getLocalHost(), 8123);
+        BornHost = new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 0);
+    }
+
+    @AfterClass
+    public static void tearDownAfterClass() throws Exception {
+    }
+
+    @Test
+    public void test_recover_normally() throws Exception {
+        this.writeMessage(true, true);
+        Thread.sleep(1000 * 3);
+        this.readMessage(1000);
+        this.destroy();
+    }
+
+    public void writeMessage(boolean normal, boolean first) throws Exception {
+        System.out.println("================================================================");
+        long totalMsgs = 100;
+        QUEUE_TOTAL = 3;
+
+        MessageBody = StoreMessage.getBytes();
+        MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
+        messageStoreConfig.setMapedFileSizeCommitLog(1024 * 32);
+        messageStoreConfig.setMapedFileSizeConsumeQueue(100 * 20);
+        messageStoreConfig.setMessageIndexEnable(false);
+
+        MessageStore messageStore = new DefaultMessageStore(messageStoreConfig, null, null, null);
+        if (first) {
+            this.storeWrite1 = messageStore;
+        } else {
+            this.storeWrite2 = messageStore;
+        }
+
+        boolean loadResult = messageStore.load();
+        assertTrue(loadResult);
+        messageStore.start();
+        for (long i = 0; i < totalMsgs; i++) {
+            PutMessageResult result = messageStore.putMessage(buildMessage());
+            System.out.println(i + "\t" + result.getAppendMessageResult().getMsgId());
+        }
+
+        if (normal) {
+            messageStore.shutdown();
+        }
+        System.out.println("========================writeMessage OK========================================");
+    }
+
+    public void readMessage(final long msgCnt) throws Exception {
+        System.out.println("================================================================");
+        QUEUE_TOTAL = 3;
+        MessageBody = StoreMessage.getBytes();
+        MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
+        messageStoreConfig.setMapedFileSizeCommitLog(1024 * 32);
+        messageStoreConfig.setMapedFileSizeConsumeQueue(100 * 20);
+        messageStoreConfig.setMessageIndexEnable(false);
+        storeRead = new DefaultMessageStore(messageStoreConfig, null, null, null);
+        boolean loadResult = storeRead.load();
+        assertTrue(loadResult);
+        storeRead.start();
+
+        long readCnt = 0;
+        for (int queueId = 0; queueId < QUEUE_TOTAL; queueId++) {
+            for (long offset = 0; ; ) {
+                GetMessageResult result = storeRead.getMessage("GROUP_A", "TOPIC_A", queueId, offset, 1024 * 1024, null);
+                if (result.getStatus() == GetMessageStatus.FOUND) {
+                    System.out.println(queueId + "\t" + result.getMessageCount());
+                    this.veryReadMessage(queueId, offset, result.getMessageBufferList());
+                    offset += result.getMessageCount();
+                    readCnt += result.getMessageCount();
+                    result.release();
+                } else {
+                    break;
+                }
+            }
+        }
+
+        System.out.println("readCnt = " + readCnt);
+        assertTrue(readCnt == msgCnt);
+        System.out.println("========================readMessage OK========================================");
+    }
+
+    private void destroy() {
+        if (storeWrite1 != null) {
+            storeWrite1.shutdown();
+            storeWrite1.destroy();
+        }
+
+        if (storeWrite2 != null) {
+            storeWrite2.shutdown();
+            storeWrite2.destroy();
+        }
+
+        if (storeRead != null) {
+            storeRead.shutdown();
+            storeRead.destroy();
+        }
+    }
+
+    public MessageExtBrokerInner buildMessage() {
+        MessageExtBrokerInner msg = new MessageExtBrokerInner();
+        msg.setTopic("TOPIC_A");
+        msg.setTags("TAG1");
+        msg.setKeys("Hello");
+        msg.setBody(MessageBody);
+        msg.setKeys(String.valueOf(System.currentTimeMillis()));
+        msg.setQueueId(Math.abs(QueueId.getAndIncrement()) % QUEUE_TOTAL);
+        msg.setSysFlag(4);
+        msg.setBornTimestamp(System.currentTimeMillis());
+        msg.setStoreHost(StoreHost);
+        msg.setBornHost(BornHost);
+
+        return msg;
+    }
+
+    private void veryReadMessage(int queueId, long queueOffset, List<ByteBuffer> byteBuffers) {
+        for (ByteBuffer byteBuffer : byteBuffers) {
+            MessageExt msg = MessageDecoder.decode(byteBuffer);
+            System.out.println("request queueId " + queueId + ", request queueOffset " + queueOffset + " msg queue offset "
+                    + msg.getQueueOffset());
+
+            assertTrue(msg.getQueueOffset() == queueOffset);
+
+            queueOffset++;
+        }
+    }
+
+    @Test
+    public void test_recover_normally_write() throws Exception {
+        this.writeMessage(true, true);
+        Thread.sleep(1000 * 3);
+        this.writeMessage(true, false);
+        Thread.sleep(1000 * 3);
+        this.readMessage(2000);
+        this.destroy();
+    }
+
+    @Test
+    public void test_recover_abnormally() throws Exception {
+        this.writeMessage(false, true);
+        Thread.sleep(1000 * 3);
+        this.readMessage(1000);
+        this.destroy();
+    }
+
+    @Test
+    public void test_recover_abnormally_write() throws Exception {
+        this.writeMessage(false, true);
+        Thread.sleep(1000 * 3);
+        this.writeMessage(false, false);
+        Thread.sleep(1000 * 3);
+        this.readMessage(2000);
+        this.destroy();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/StoreCheckpointTest.java
----------------------------------------------------------------------
diff --git a/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/StoreCheckpointTest.java b/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/StoreCheckpointTest.java
new file mode 100644
index 0000000..e0a550d
--- /dev/null
+++ b/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/StoreCheckpointTest.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+/**
+ * $Id: StoreCheckpointTest.java 1831 2013-05-16 01:39:51Z shijia.wxr $
+ */
+package com.alibaba.rocketmq.store;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+
+
+public class StoreCheckpointTest {
+    @BeforeClass
+    public static void setUpBeforeClass() throws Exception {
+
+    }
+
+    @AfterClass
+    public static void tearDownAfterClass() throws Exception {
+    }
+
+    @Test
+    public void test_write_read() {
+        try {
+            StoreCheckpoint storeCheckpoint = new StoreCheckpoint("target/checkpoint_test/0000");
+            long physicMsgTimestamp = 0xAABB;
+            long logicsMsgTimestamp = 0xCCDD;
+            storeCheckpoint.setPhysicMsgTimestamp(physicMsgTimestamp);
+            storeCheckpoint.setLogicsMsgTimestamp(logicsMsgTimestamp);
+            storeCheckpoint.flush();
+
+            long diff = physicMsgTimestamp - storeCheckpoint.getMinTimestamp();
+            assertTrue(diff == 3000);
+            storeCheckpoint.shutdown();
+            storeCheckpoint = new StoreCheckpoint("target/checkpoint_test/0000");
+            assertTrue(physicMsgTimestamp == storeCheckpoint.getPhysicMsgTimestamp());
+            assertTrue(logicsMsgTimestamp == storeCheckpoint.getLogicsMsgTimestamp());
+        } catch (Throwable e) {
+            e.printStackTrace();
+            assertTrue(false);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/index/IndexFileTest.java
----------------------------------------------------------------------
diff --git a/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/index/IndexFileTest.java b/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/index/IndexFileTest.java
new file mode 100644
index 0000000..288b87e
--- /dev/null
+++ b/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/index/IndexFileTest.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+/**
+ * $Id: IndexFileTest.java 1831 2013-05-16 01:39:51Z shijia.wxr $
+ */
+package com.alibaba.rocketmq.store.index;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+
+public class IndexFileTest {
+    private static final int hashSlotNum = 100;
+    private static final int indexNum = 400;
+
+    @Test
+    public void test_put_index() {
+        try {
+            IndexFile indexFile = new IndexFile("100", hashSlotNum, indexNum, 0, 0);
+            for (long i = 0; i < (indexNum - 1); i++) {
+                boolean putResult = indexFile.putKey(Long.toString(i), i, System.currentTimeMillis());
+                assertTrue(putResult);
+            }
+
+            boolean putResult = indexFile.putKey(Long.toString(400), 400, System.currentTimeMillis());
+            assertFalse(putResult);
+
+            indexFile.destroy(0);
+        } catch (Exception e) {
+            e.printStackTrace();
+            assertTrue(false);
+        }
+    }
+
+
+    @Test
+    public void test_put_get_index() {
+        try {
+            IndexFile indexFile = new IndexFile("200", hashSlotNum, indexNum, 0, 0);
+
+            for (long i = 0; i < (indexNum - 1); i++) {
+                boolean putResult = indexFile.putKey(Long.toString(i), i, System.currentTimeMillis());
+                assertTrue(putResult);
+            }
+            boolean putResult = indexFile.putKey(Long.toString(400), 400, System.currentTimeMillis());
+            assertFalse(putResult);
+
+            final List<Long> phyOffsets = new ArrayList<Long>();
+            indexFile.selectPhyOffset(phyOffsets, "60", 10, 0, Long.MAX_VALUE, true);
+            for (Long offset : phyOffsets) {
+                System.out.println(offset);
+            }
+            assertFalse(phyOffsets.isEmpty());
+            indexFile.destroy(0);
+        } catch (Exception e) {
+            e.printStackTrace();
+            assertTrue(false);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/schedule/ScheduleMessageTest.java
----------------------------------------------------------------------
diff --git a/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/schedule/ScheduleMessageTest.java b/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/schedule/ScheduleMessageTest.java
new file mode 100644
index 0000000..d7de738
--- /dev/null
+++ b/rocketmq-store/src/test/java/com/alibaba/rocketmq/store/schedule/ScheduleMessageTest.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+/**
+ * $Id: ScheduleMessageTest.java 1831 2013-05-16 01:39:51Z shijia.wxr $
+ */
+package com.alibaba.rocketmq.store.schedule;
+
+import com.alibaba.rocketmq.store.*;
+import com.alibaba.rocketmq.store.config.MessageStoreConfig;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.Assert.assertTrue;
+
+@Ignore
+public class ScheduleMessageTest {
+    private static final String StoreMessage = "Once, there was a chance for me!";
+
+    private static int QUEUE_TOTAL = 100;
+
+    private static AtomicInteger QueueId = new AtomicInteger(0);
+
+    private static SocketAddress BornHost;
+
+    private static SocketAddress StoreHost;
+
+    private static byte[] MessageBody;
+
+    @BeforeClass
+    public static void setUpBeforeClass() throws Exception {
+        StoreHost = new InetSocketAddress(InetAddress.getLocalHost(), 8123);
+        BornHost = new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 0);
+    }
+
+    @AfterClass
+    public static void tearDownAfterClass() throws Exception {
+    }
+
+    @Test
+    public void test_delay_message() throws Exception {
+        System.out.println("================================================================");
+        long totalMsgs = 10000;
+        QUEUE_TOTAL = 32;
+
+
+        MessageBody = StoreMessage.getBytes();
+
+        MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
+        messageStoreConfig.setMapedFileSizeCommitLog(1024 * 32);
+        messageStoreConfig.setMapedFileSizeConsumeQueue(1024 * 16);
+        messageStoreConfig.setMaxHashSlotNum(100);
+        messageStoreConfig.setMaxIndexNum(1000 * 10);
+
+        MessageStore master = new DefaultMessageStore(messageStoreConfig, null, null, null);
+
+        boolean load = master.load();
+        assertTrue(load);
+
+
+        master.start();
+        for (int i = 0; i < totalMsgs; i++) {
+            MessageExtBrokerInner msg = buildMessage();
+            msg.setDelayTimeLevel(i % 4);
+
+            PutMessageResult result = master.putMessage(msg);
+            System.out.println(i + "\t" + result.getAppendMessageResult().getMsgId());
+        }
+
+        System.out.println("write message over, wait time up");
+        Thread.sleep(1000 * 20);
+
+
+        for (long i = 0; i < totalMsgs; i++) {
+            try {
+                GetMessageResult result = master.getMessage("GROUP_A", "TOPIC_A", 0, i, 1024 * 1024, null);
+                if (result == null) {
+                    System.out.println("result == null " + i);
+                }
+                assertTrue(result != null);
+                result.release();
+                System.out.println("read " + i + " OK");
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+
+        }
+
+        Thread.sleep(1000 * 15);
+
+
+        master.shutdown();
+
+
+        master.destroy();
+        System.out.println("================================================================");
+    }
+
+    public MessageExtBrokerInner buildMessage() {
+        MessageExtBrokerInner msg = new MessageExtBrokerInner();
+        msg.setTopic("AAA");
+        msg.setTags("TAG1");
+        msg.setKeys("Hello");
+        msg.setBody(MessageBody);
+        msg.setKeys(String.valueOf(System.currentTimeMillis()));
+        msg.setQueueId(Math.abs(QueueId.getAndIncrement()) % QUEUE_TOTAL);
+        msg.setSysFlag(4);
+        msg.setBornTimestamp(System.currentTimeMillis());
+        msg.setStoreHost(StoreHost);
+        msg.setBornHost(BornHost);
+
+        return msg;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/pom.xml
----------------------------------------------------------------------
diff --git a/rocketmq-tools/pom.xml b/rocketmq-tools/pom.xml
new file mode 100644
index 0000000..5070a68
--- /dev/null
+++ b/rocketmq-tools/pom.xml
@@ -0,0 +1,66 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain producerGroup copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <groupId>com.alibaba.rocketmq</groupId>
+        <artifactId>rocketmq-all</artifactId>
+        <version>4.0.0-SNAPSHOT</version>
+    </parent>
+
+    <modelVersion>4.0.0</modelVersion>
+    <packaging>jar</packaging>
+    <artifactId>rocketmq-tools</artifactId>
+    <name>rocketmq-tools ${project.version}</name>
+
+    <dependencies>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>rocketmq-client</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>rocketmq-store</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>rocketmq-srvutil</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.alibaba</groupId>
+            <artifactId>fastjson</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-classic</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-lang3</artifactId>
+        </dependency>
+    </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/admin/DefaultMQAdminExt.java
----------------------------------------------------------------------
diff --git a/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/admin/DefaultMQAdminExt.java b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/admin/DefaultMQAdminExt.java
new file mode 100644
index 0000000..4576886
--- /dev/null
+++ b/rocketmq-tools/src/main/java/com/alibaba/rocketmq/tools/admin/DefaultMQAdminExt.java
@@ -0,0 +1,458 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.tools.admin;
+
+import com.alibaba.rocketmq.client.ClientConfig;
+import com.alibaba.rocketmq.client.QueryResult;
+import com.alibaba.rocketmq.client.exception.MQBrokerException;
+import com.alibaba.rocketmq.client.exception.MQClientException;
+import com.alibaba.rocketmq.common.MixAll;
+import com.alibaba.rocketmq.common.TopicConfig;
+import com.alibaba.rocketmq.common.admin.ConsumeStats;
+import com.alibaba.rocketmq.common.admin.RollbackStats;
+import com.alibaba.rocketmq.common.admin.TopicStatsTable;
+import com.alibaba.rocketmq.common.message.MessageExt;
+import com.alibaba.rocketmq.common.message.MessageQueue;
+import com.alibaba.rocketmq.common.protocol.body.*;
+import com.alibaba.rocketmq.common.protocol.route.TopicRouteData;
+import com.alibaba.rocketmq.common.subscription.SubscriptionGroupConfig;
+import com.alibaba.rocketmq.remoting.RPCHook;
+import com.alibaba.rocketmq.remoting.exception.*;
+import com.alibaba.rocketmq.tools.admin.api.MessageTrack;
+
+import java.io.UnsupportedEncodingException;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt {
+    private final DefaultMQAdminExtImpl defaultMQAdminExtImpl;
+    private String adminExtGroup = "admin_ext_group";
+    private String createTopicKey = MixAll.DEFAULT_TOPIC;
+    private long timeoutMillis = 5000;
+
+    public DefaultMQAdminExt() {
+        this.defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(this, null, timeoutMillis);
+    }
+
+    public DefaultMQAdminExt(long timeoutMillis) {
+        this.defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(this, null, timeoutMillis);
+    }
+
+    public DefaultMQAdminExt(RPCHook rpcHook) {
+        this.defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(this, rpcHook, timeoutMillis);
+    }
+
+    public DefaultMQAdminExt(RPCHook rpcHook, long timeoutMillis) {
+        this.defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(this, rpcHook, timeoutMillis);
+    }
+
+    public DefaultMQAdminExt(final String adminExtGroup) {
+        this.adminExtGroup = adminExtGroup;
+        this.defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(this, timeoutMillis);
+    }
+
+    public DefaultMQAdminExt(final String adminExtGroup, long timeoutMillis) {
+        this.adminExtGroup = adminExtGroup;
+        this.defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(this, timeoutMillis);
+    }
+
+    @Override
+    public void createTopic(String key, String newTopic, int queueNum) throws MQClientException {
+        createTopic(key, newTopic, queueNum, 0);
+    }
+
+    @Override
+    public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) throws MQClientException {
+        defaultMQAdminExtImpl.createTopic(key, newTopic, queueNum, topicSysFlag);
+    }
+
+    @Override
+    public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException {
+        return defaultMQAdminExtImpl.searchOffset(mq, timestamp);
+    }
+
+    @Override
+    public long maxOffset(MessageQueue mq) throws MQClientException {
+        return defaultMQAdminExtImpl.maxOffset(mq);
+    }
+
+
+    @Override
+    public long minOffset(MessageQueue mq) throws MQClientException {
+        return defaultMQAdminExtImpl.minOffset(mq);
+    }
+
+    @Override
+    public long earliestMsgStoreTime(MessageQueue mq) throws MQClientException {
+        return defaultMQAdminExtImpl.earliestMsgStoreTime(mq);
+    }
+
+    @Override
+    public MessageExt viewMessage(String offsetMsgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+        return defaultMQAdminExtImpl.viewMessage(offsetMsgId);
+    }
+
+    @Override
+    public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end) throws MQClientException,
+            InterruptedException {
+        return defaultMQAdminExtImpl.queryMessage(topic, key, maxNum, begin, end);
+    }
+
+    @Override
+    public void start() throws MQClientException {
+        defaultMQAdminExtImpl.start();
+    }
+
+    @Override
+    public void shutdown() {
+        defaultMQAdminExtImpl.shutdown();
+    }
+
+    @Override
+    public void updateBrokerConfig(String brokerAddr, Properties properties) throws RemotingConnectException, RemotingSendRequestException,
+            RemotingTimeoutException, UnsupportedEncodingException, InterruptedException, MQBrokerException {
+        defaultMQAdminExtImpl.updateBrokerConfig(brokerAddr, properties);
+    }
+
+    @Override
+    public Properties getBrokerConfig(final String brokerAddr) throws RemotingConnectException,
+            RemotingSendRequestException, RemotingTimeoutException, UnsupportedEncodingException, InterruptedException, MQBrokerException {
+        return defaultMQAdminExtImpl.getBrokerConfig(brokerAddr);
+    }
+
+    @Override
+    public void createAndUpdateTopicConfig(String addr, TopicConfig config) throws RemotingException, MQBrokerException,
+            InterruptedException, MQClientException {
+        defaultMQAdminExtImpl.createAndUpdateTopicConfig(addr, config);
+    }
+
+    @Override
+    public void createAndUpdateSubscriptionGroupConfig(String addr, SubscriptionGroupConfig config) throws RemotingException,
+            MQBrokerException, InterruptedException, MQClientException {
+        defaultMQAdminExtImpl.createAndUpdateSubscriptionGroupConfig(addr, config);
+    }
+
+    @Override
+    public SubscriptionGroupConfig examineSubscriptionGroupConfig(String addr, String group) {
+        return defaultMQAdminExtImpl.examineSubscriptionGroupConfig(addr, group);
+    }
+
+    @Override
+    public TopicConfig examineTopicConfig(String addr, String topic) {
+        return defaultMQAdminExtImpl.examineTopicConfig(addr, topic);
+    }
+
+    @Override
+    public TopicStatsTable examineTopicStats(String topic) throws RemotingException, MQClientException, InterruptedException,
+            MQBrokerException {
+        return defaultMQAdminExtImpl.examineTopicStats(topic);
+    }
+
+    @Override
+    public TopicList fetchAllTopicList() throws RemotingException, MQClientException, InterruptedException {
+        return this.defaultMQAdminExtImpl.fetchAllTopicList();
+    }
+
+    @Override
+    public TopicList fetchTopicsByCLuster(String clusterName) throws RemotingException, MQClientException, InterruptedException {
+        return this.defaultMQAdminExtImpl.fetchTopicsByCLuster(clusterName);
+    }
+
+    @Override
+    public KVTable fetchBrokerRuntimeStats(final String brokerAddr) throws RemotingConnectException, RemotingSendRequestException,
+            RemotingTimeoutException, InterruptedException, MQBrokerException {
+        return this.defaultMQAdminExtImpl.fetchBrokerRuntimeStats(brokerAddr);
+    }
+
+    @Override
+    public ConsumeStats examineConsumeStats(String consumerGroup) throws RemotingException, MQClientException, InterruptedException,
+            MQBrokerException {
+        return examineConsumeStats(consumerGroup, null);
+    }
+
+    @Override
+    public ConsumeStats examineConsumeStats(String consumerGroup, String topic) throws RemotingException, MQClientException,
+            InterruptedException, MQBrokerException {
+        return defaultMQAdminExtImpl.examineConsumeStats(consumerGroup, topic);
+    }
+
+    @Override
+    public ClusterInfo examineBrokerClusterInfo() throws InterruptedException, RemotingConnectException, RemotingTimeoutException,
+            RemotingSendRequestException, MQBrokerException {
+        return defaultMQAdminExtImpl.examineBrokerClusterInfo();
+    }
+
+    @Override
+    public TopicRouteData examineTopicRouteInfo(String topic) throws RemotingException, MQClientException, InterruptedException {
+        return defaultMQAdminExtImpl.examineTopicRouteInfo(topic);
+    }
+
+    @Override
+    public ConsumerConnection examineConsumerConnectionInfo(String consumerGroup) throws InterruptedException, MQBrokerException,
+            RemotingException, MQClientException {
+        return defaultMQAdminExtImpl.examineConsumerConnectionInfo(consumerGroup);
+    }
+
+    @Override
+    public ProducerConnection examineProducerConnectionInfo(String producerGroup, final String topic) throws RemotingException,
+            MQClientException, InterruptedException, MQBrokerException {
+        return defaultMQAdminExtImpl.examineProducerConnectionInfo(producerGroup, topic);
+    }
+
+    @Override
+    public List<String> getNameServerAddressList() {
+        return this.defaultMQAdminExtImpl.getNameServerAddressList();
+    }
+
+    @Override
+    public int wipeWritePermOfBroker(final String namesrvAddr, String brokerName) throws RemotingCommandException,
+            RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQClientException {
+        return defaultMQAdminExtImpl.wipeWritePermOfBroker(namesrvAddr, brokerName);
+    }
+
+    @Override
+    public void putKVConfig(String namespace, String key, String value) {
+        defaultMQAdminExtImpl.putKVConfig(namespace, key, value);
+    }
+
+    @Override
+    public String getKVConfig(String namespace, String key) throws RemotingException, MQClientException, InterruptedException {
+        return defaultMQAdminExtImpl.getKVConfig(namespace, key);
+    }
+
+    @Override
+    public KVTable getKVListByNamespace(String namespace) throws RemotingException, MQClientException, InterruptedException {
+        return defaultMQAdminExtImpl.getKVListByNamespace(namespace);
+    }
+
+    @Override
+    public void deleteTopicInBroker(Set<String> addrs, String topic) throws RemotingException, MQBrokerException, InterruptedException,
+            MQClientException {
+        defaultMQAdminExtImpl.deleteTopicInBroker(addrs, topic);
+    }
+
+    @Override
+    public void deleteTopicInNameServer(Set<String> addrs, String topic) throws RemotingException, MQBrokerException, InterruptedException,
+            MQClientException {
+        defaultMQAdminExtImpl.deleteTopicInNameServer(addrs, topic);
+    }
+
+    @Override
+    public void deleteSubscriptionGroup(String addr, String groupName) throws RemotingException, MQBrokerException, InterruptedException,
+            MQClientException {
+        defaultMQAdminExtImpl.deleteSubscriptionGroup(addr, groupName);
+    }
+
+    @Override
+    public void createAndUpdateKvConfig(String namespace, String key, String value) throws RemotingException, MQBrokerException,
+            InterruptedException, MQClientException {
+        defaultMQAdminExtImpl.createAndUpdateKvConfig(namespace, key, value);
+    }
+
+    @Override
+    public void deleteKvConfig(String namespace, String key) throws RemotingException, MQBrokerException, InterruptedException,
+            MQClientException {
+        defaultMQAdminExtImpl.deleteKvConfig(namespace, key);
+    }
+
+    public List<RollbackStats> resetOffsetByTimestampOld(String consumerGroup, String topic, long timestamp, boolean force)
+            throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+        return defaultMQAdminExtImpl.resetOffsetByTimestampOld(consumerGroup, topic, timestamp, force);
+    }
+
+    @Override
+    public Map<MessageQueue, Long> resetOffsetByTimestamp(String topic, String group, long timestamp, boolean isForce)
+            throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+        return resetOffsetByTimestamp(topic, group, timestamp, isForce, false);
+    }
+
+    public Map<MessageQueue, Long> resetOffsetByTimestamp(String topic, String group, long timestamp, boolean isForce, boolean isC)
+            throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+        return defaultMQAdminExtImpl.resetOffsetByTimestamp(topic, group, timestamp, isForce, isC);
+    }
+
+    @Override
+    public void resetOffsetNew(String consumerGroup, String topic, long timestamp) throws RemotingException, MQBrokerException,
+            InterruptedException, MQClientException {
+        this.defaultMQAdminExtImpl.resetOffsetNew(consumerGroup, topic, timestamp);
+    }
+
+    @Override
+    public Map<String, Map<MessageQueue, Long>> getConsumeStatus(String topic, String group, String clientAddr) throws RemotingException,
+            MQBrokerException, InterruptedException, MQClientException {
+        return defaultMQAdminExtImpl.getConsumeStatus(topic, group, clientAddr);
+    }
+
+    @Override
+    public void createOrUpdateOrderConf(String key, String value, boolean isCluster) throws RemotingException, MQBrokerException,
+            InterruptedException, MQClientException {
+        defaultMQAdminExtImpl.createOrUpdateOrderConf(key, value, isCluster);
+    }
+
+    @Override
+    public GroupList queryTopicConsumeByWho(String topic) throws InterruptedException, MQBrokerException, RemotingException,
+            MQClientException {
+        return this.defaultMQAdminExtImpl.queryTopicConsumeByWho(topic);
+    }
+
+    @Override
+    public List<QueueTimeSpan> queryConsumeTimeSpan(final String topic, final String group) throws InterruptedException, MQBrokerException,
+            RemotingException, MQClientException {
+        return this.defaultMQAdminExtImpl.queryConsumeTimeSpan(topic, group);
+    }
+
+    @Override
+    public boolean cleanExpiredConsumerQueue(String cluster) throws RemotingConnectException, RemotingSendRequestException,
+            RemotingTimeoutException, MQClientException, InterruptedException {
+        return defaultMQAdminExtImpl.cleanExpiredConsumerQueue(cluster);
+    }
+
+    @Override
+    public boolean cleanExpiredConsumerQueueByAddr(String addr) throws RemotingConnectException, RemotingSendRequestException,
+            RemotingTimeoutException, MQClientException, InterruptedException {
+        return defaultMQAdminExtImpl.cleanExpiredConsumerQueueByAddr(addr);
+    }
+
+    @Override
+    public boolean cleanUnusedTopic(String cluster) throws RemotingConnectException, RemotingSendRequestException,
+            RemotingTimeoutException, MQClientException, InterruptedException {
+        return defaultMQAdminExtImpl.cleanUnusedTopicByAddr(cluster);
+    }
+
+    @Override
+    public boolean cleanUnusedTopicByAddr(String addr) throws RemotingConnectException, RemotingSendRequestException,
+            RemotingTimeoutException, MQClientException, InterruptedException {
+        return defaultMQAdminExtImpl.cleanUnusedTopicByAddr(addr);
+    }
+
+    @Override
+    public ConsumerRunningInfo getConsumerRunningInfo(String consumerGroup, String clientId, boolean jstack) throws RemotingException,
+            MQClientException, InterruptedException {
+        return defaultMQAdminExtImpl.getConsumerRunningInfo(consumerGroup, clientId, jstack);
+    }
+
+    @Override
+    public ConsumeMessageDirectlyResult consumeMessageDirectly(String consumerGroup, String clientId, String msgId)
+            throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
+        return defaultMQAdminExtImpl.consumeMessageDirectly(consumerGroup, clientId, msgId);
+    }
+
+    @Override
+    public ConsumeMessageDirectlyResult consumeMessageDirectly(final String consumerGroup, final String clientId, final String topic, final String msgId) throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
+        return defaultMQAdminExtImpl.consumeMessageDirectly(consumerGroup, clientId, topic, msgId);
+    }
+
+    @Override
+    public List<MessageTrack> messageTrackDetail(MessageExt msg) throws RemotingException, MQClientException, InterruptedException,
+            MQBrokerException {
+        return this.defaultMQAdminExtImpl.messageTrackDetail(msg);
+    }
+
+    @Override
+    public void cloneGroupOffset(String srcGroup, String destGroup, String topic, boolean isOffline) throws RemotingException,
+            MQClientException, InterruptedException, MQBrokerException {
+        this.defaultMQAdminExtImpl.cloneGroupOffset(srcGroup, destGroup, topic, isOffline);
+    }
+
+    @Override
+    public BrokerStatsData viewBrokerStatsData(String brokerAddr, String statsName, String statsKey) throws RemotingConnectException,
+            RemotingSendRequestException, RemotingTimeoutException, MQClientException, InterruptedException {
+        return this.defaultMQAdminExtImpl.viewBrokerStatsData(brokerAddr, statsName, statsKey);
+    }
+
+    @Override
+    public Set<String> getClusterList(String topic) throws RemotingConnectException, RemotingSendRequestException,
+            RemotingTimeoutException, MQClientException, InterruptedException {
+        return this.defaultMQAdminExtImpl.getClusterList(topic);
+    }
+
+    @Override
+    public ConsumeStatsList fetchConsumeStatsInBroker(final String brokerAddr, boolean isOrder, long timeoutMillis) throws RemotingConnectException, RemotingSendRequestException,
+            RemotingTimeoutException, MQClientException, InterruptedException {
+        return this.defaultMQAdminExtImpl.fetchConsumeStatsInBroker(brokerAddr, isOrder, timeoutMillis);
+    }
+
+    @Override
+    public Set<String> getTopicClusterList(final String topic) throws InterruptedException, MQBrokerException, MQClientException, RemotingException {
+        return this.defaultMQAdminExtImpl.getTopicClusterList(topic);
+    }
+
+    @Override
+    public SubscriptionGroupWrapper getAllSubscriptionGroup(final String brokerAddr, long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException,
+            RemotingConnectException, MQBrokerException {
+        return this.defaultMQAdminExtImpl.getAllSubscriptionGroup(brokerAddr, timeoutMillis);
+    }
+
+    @Override
+    public TopicConfigSerializeWrapper getAllTopicGroup(final String brokerAddr, long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException,
+            RemotingConnectException, MQBrokerException {
+        return this.defaultMQAdminExtImpl.getAllTopicGroup(brokerAddr, timeoutMillis);
+    }
+
+
+    /* (non-Javadoc)
+     * @see com.alibaba.rocketmq.client.MQAdmin#queryMessageByUniqKey(java.lang.String, java.lang.String)
+     */
+    @Override
+    public MessageExt viewMessage(String topic, String msgId)
+            throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+        return this.defaultMQAdminExtImpl.viewMessage(topic, msgId);
+    }
+
+    public String getAdminExtGroup() {
+        return adminExtGroup;
+    }
+
+    public void setAdminExtGroup(String adminExtGroup) {
+        this.adminExtGroup = adminExtGroup;
+    }
+
+    public String getCreateTopicKey() {
+        return createTopicKey;
+    }
+
+    public void setCreateTopicKey(String createTopicKey) {
+        this.createTopicKey = createTopicKey;
+    }
+
+    @Override
+    public void updateConsumeOffset(String brokerAddr, String consumeGroup, MessageQueue mq, long offset) throws RemotingException, InterruptedException, MQBrokerException {
+        this.defaultMQAdminExtImpl.updateConsumeOffset(brokerAddr, consumeGroup, mq, offset);
+    }
+
+    @Override
+    public void updateNameServerConfig(final Properties properties, final List<String> nameServers)
+            throws InterruptedException, RemotingConnectException,
+            UnsupportedEncodingException, MQBrokerException, RemotingTimeoutException,
+            MQClientException, RemotingSendRequestException {
+        this.defaultMQAdminExtImpl.updateNameServerConfig(properties, nameServers);
+    }
+
+    @Override
+    public Map<String, Properties> getNameServerConfig(final List<String> nameServers)
+            throws InterruptedException, RemotingTimeoutException,
+            RemotingSendRequestException, RemotingConnectException, MQClientException,
+            UnsupportedEncodingException {
+        return this.defaultMQAdminExtImpl.getNameServerConfig(nameServers);
+    }
+}