You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2016/12/28 09:14:34 UTC

[43/99] [abbrv] [partial] incubator-rocketmq git commit: ROCKETMQ-18 Rename package name from com.alibaba to org.apache

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerUtil.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerUtil.java b/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerUtil.java
new file mode 100644
index 0000000..1c40c0e
--- /dev/null
+++ b/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerUtil.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.broker.filtersrv;
+
+import org.slf4j.Logger;
+
+
+public class FilterServerUtil {
+    public static void callShell(final String shellString, final Logger log) {
+        Process process = null;
+        try {
+            String[] cmdArray = splitShellString(shellString);
+            process = Runtime.getRuntime().exec(cmdArray);
+            process.waitFor();
+            log.info("callShell: <{}> OK", shellString);
+        } catch (Throwable e) {
+            log.error("callShell: readLine IOException, " + shellString, e);
+        } finally {
+            if (null != process)
+                process.destroy();
+        }
+    }
+
+    private static String[] splitShellString(final String shellString) {
+        String[] split = shellString.split(" ");
+        return split;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java b/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java
new file mode 100644
index 0000000..57a451f
--- /dev/null
+++ b/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.broker.latency;
+
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.common.ThreadFactoryImpl;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.remoting.netty.RequestTask;
+import org.apache.rocketmq.remoting.protocol.RemotingSysResponseCode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class BrokerFastFailure {
+    private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+    private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
+            "BrokerFastFailureScheduledThread"));
+    private final BrokerController brokerController;
+
+    public BrokerFastFailure(final BrokerController brokerController) {
+        this.brokerController = brokerController;
+    }
+
+    public void start() {
+        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+            @Override
+            public void run() {
+                cleanExpiredRequest();
+            }
+        }, 1000, 10, TimeUnit.MILLISECONDS);
+    }
+
+    private void cleanExpiredRequest() {
+        while (this.brokerController.getMessageStore().isOSPageCacheBusy()) {
+            try {
+                if (!this.brokerController.getSendThreadPoolQueue().isEmpty()) {
+                    final Runnable runnable = this.brokerController.getSendThreadPoolQueue().poll(0, TimeUnit.SECONDS);
+                    if (null == runnable) {
+                        break;
+                    }
+
+                    final RequestTask rt = castRunnable(runnable);
+                    rt.returnResponse(RemotingSysResponseCode.SYSTEM_BUSY, String.format("[PCBUSY_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, size of queue: %d", System.currentTimeMillis() - rt.getCreateTimestamp(), this.brokerController.getSendThreadPoolQueue().size()));
+                } else {
+                    break;
+                }
+            } catch (Throwable e) {
+            }
+        }
+
+        while (true) {
+            try {
+                if (!this.brokerController.getSendThreadPoolQueue().isEmpty()) {
+                    final Runnable runnable = this.brokerController.getSendThreadPoolQueue().peek();
+                    if (null == runnable) {
+                        break;
+                    }
+                    final RequestTask rt = castRunnable(runnable);
+                    if (rt.isStopRun()) {
+                        break;
+                    }
+
+                    final long behind = System.currentTimeMillis() - rt.getCreateTimestamp();
+                    if (behind >= this.brokerController.getBrokerConfig().getWaitTimeMillsInSendQueue()) {
+                        if (this.brokerController.getSendThreadPoolQueue().remove(runnable)) {
+                            rt.setStopRun(true);
+                            rt.returnResponse(RemotingSysResponseCode.SYSTEM_BUSY, String.format("[TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, size of queue: %d", behind, this.brokerController.getSendThreadPoolQueue().size()));
+                        }
+                    } else {
+                        break;
+                    }
+                } else {
+                    break;
+                }
+            } catch (Throwable e) {
+            }
+        }
+    }
+
+    public static RequestTask castRunnable(final Runnable runnable) {
+        try {
+            FutureTaskExt object = (FutureTaskExt) runnable;
+            return (RequestTask) object.getRunnable();
+        } catch (Throwable e) {
+            log.error(String.format("castRunnable exception, %s", runnable.getClass().getName()), e);
+        }
+
+        return null;
+    }
+
+    public void shutdown() {
+        this.scheduledExecutorService.shutdown();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFixedThreadPoolExecutor.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFixedThreadPoolExecutor.java b/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFixedThreadPoolExecutor.java
new file mode 100644
index 0000000..352543e
--- /dev/null
+++ b/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFixedThreadPoolExecutor.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.broker.latency;
+
+import java.util.concurrent.*;
+
+/**
+ * @author shijia.wxr
+ */
+public class BrokerFixedThreadPoolExecutor extends ThreadPoolExecutor {
+    public BrokerFixedThreadPoolExecutor(final int corePoolSize, final int maximumPoolSize, final long keepAliveTime, final TimeUnit unit, final BlockingQueue<Runnable> workQueue) {
+        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
+    }
+
+    public BrokerFixedThreadPoolExecutor(final int corePoolSize, final int maximumPoolSize, final long keepAliveTime, final TimeUnit unit, final BlockingQueue<Runnable> workQueue, final ThreadFactory threadFactory) {
+        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
+    }
+
+    public BrokerFixedThreadPoolExecutor(final int corePoolSize, final int maximumPoolSize, final long keepAliveTime, final TimeUnit unit, final BlockingQueue<Runnable> workQueue, final RejectedExecutionHandler handler) {
+        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
+    }
+
+    public BrokerFixedThreadPoolExecutor(final int corePoolSize, final int maximumPoolSize, final long keepAliveTime, final TimeUnit unit, final BlockingQueue<Runnable> workQueue, final ThreadFactory threadFactory, final RejectedExecutionHandler handler) {
+        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
+    }
+
+    @Override
+    protected <T> RunnableFuture<T> newTaskFor(final Runnable runnable, final T value) {
+        return new FutureTaskExt<T>(runnable, value);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/org/apache/rocketmq/broker/latency/FutureTaskExt.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/latency/FutureTaskExt.java b/broker/src/main/java/org/apache/rocketmq/broker/latency/FutureTaskExt.java
new file mode 100644
index 0000000..642cdd9
--- /dev/null
+++ b/broker/src/main/java/org/apache/rocketmq/broker/latency/FutureTaskExt.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.broker.latency;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.FutureTask;
+
+/**
+ * @author shijia.wxr
+ */
+public class FutureTaskExt<V> extends FutureTask<V> {
+    private final Runnable runnable;
+
+    public FutureTaskExt(final Callable<V> callable) {
+        super(callable);
+        this.runnable = null;
+    }
+
+    public FutureTaskExt(final Runnable runnable, final V result) {
+        super(runnable, result);
+        this.runnable = runnable;
+    }
+
+    public Runnable getRunnable() {
+        return runnable;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/org/apache/rocketmq/broker/longpolling/ManyPullRequest.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/ManyPullRequest.java b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/ManyPullRequest.java
new file mode 100644
index 0000000..7e9e40a
--- /dev/null
+++ b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/ManyPullRequest.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.broker.longpolling;
+
+import java.util.ArrayList;
+import java.util.List;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class ManyPullRequest {
+    private final ArrayList<PullRequest> pullRequestList = new ArrayList<PullRequest>();
+
+
+    public synchronized void addPullRequest(final PullRequest pullRequest) {
+        this.pullRequestList.add(pullRequest);
+    }
+
+
+    public synchronized void addPullRequest(final List<PullRequest> many) {
+        this.pullRequestList.addAll(many);
+    }
+
+
+    public synchronized List<PullRequest> cloneListAndClear() {
+        if (!this.pullRequestList.isEmpty()) {
+            List<PullRequest> result = (ArrayList<PullRequest>) this.pullRequestList.clone();
+            this.pullRequestList.clear();
+            return result;
+        }
+
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/org/apache/rocketmq/broker/longpolling/NotifyMessageArrivingListener.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/NotifyMessageArrivingListener.java b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/NotifyMessageArrivingListener.java
new file mode 100644
index 0000000..f953c1e
--- /dev/null
+++ b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/NotifyMessageArrivingListener.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.broker.longpolling;
+
+import org.apache.rocketmq.store.MessageArrivingListener;
+
+
+public class NotifyMessageArrivingListener implements MessageArrivingListener {
+    private final PullRequestHoldService pullRequestHoldService;
+
+
+    public NotifyMessageArrivingListener(final PullRequestHoldService pullRequestHoldService) {
+        this.pullRequestHoldService = pullRequestHoldService;
+    }
+
+
+    @Override
+    public void arriving(String topic, int queueId, long logicOffset, long tagsCode) {
+        this.pullRequestHoldService.notifyMessageArriving(topic, queueId, logicOffset, tagsCode);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequest.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequest.java b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequest.java
new file mode 100644
index 0000000..cf03b03
--- /dev/null
+++ b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequest.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.broker.longpolling;
+
+import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import io.netty.channel.Channel;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class PullRequest {
+    private final RemotingCommand requestCommand;
+    private final Channel clientChannel;
+    private final long timeoutMillis;
+    private final long suspendTimestamp;
+    private final long pullFromThisOffset;
+    private final SubscriptionData subscriptionData;
+
+
+    public PullRequest(RemotingCommand requestCommand, Channel clientChannel, long timeoutMillis, long suspendTimestamp,
+                       long pullFromThisOffset, SubscriptionData subscriptionData) {
+        this.requestCommand = requestCommand;
+        this.clientChannel = clientChannel;
+        this.timeoutMillis = timeoutMillis;
+        this.suspendTimestamp = suspendTimestamp;
+        this.pullFromThisOffset = pullFromThisOffset;
+        this.subscriptionData = subscriptionData;
+    }
+
+
+    public RemotingCommand getRequestCommand() {
+        return requestCommand;
+    }
+
+
+    public Channel getClientChannel() {
+        return clientChannel;
+    }
+
+
+    public long getTimeoutMillis() {
+        return timeoutMillis;
+    }
+
+
+    public long getSuspendTimestamp() {
+        return suspendTimestamp;
+    }
+
+
+    public long getPullFromThisOffset() {
+        return pullFromThisOffset;
+    }
+
+    public SubscriptionData getSubscriptionData() {
+        return subscriptionData;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java
new file mode 100644
index 0000000..19a3f54
--- /dev/null
+++ b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.broker.longpolling;
+
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.common.ServiceThread;
+import org.apache.rocketmq.common.SystemClock;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.store.DefaultMessageFilter;
+import org.apache.rocketmq.store.MessageFilter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class PullRequestHoldService extends ServiceThread {
+    private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+    private static final String TOPIC_QUEUEID_SEPARATOR = "@";
+    private final BrokerController brokerController;
+    private final SystemClock systemClock = new SystemClock();
+    private final MessageFilter messageFilter = new DefaultMessageFilter();
+    private ConcurrentHashMap<String/* topic@queueId */, ManyPullRequest> pullRequestTable =
+            new ConcurrentHashMap<String, ManyPullRequest>(1024);
+
+
+    public PullRequestHoldService(final BrokerController brokerController) {
+        this.brokerController = brokerController;
+    }
+
+    public void suspendPullRequest(final String topic, final int queueId, final PullRequest pullRequest) {
+        String key = this.buildKey(topic, queueId);
+        ManyPullRequest mpr = this.pullRequestTable.get(key);
+        if (null == mpr) {
+            mpr = new ManyPullRequest();
+            ManyPullRequest prev = this.pullRequestTable.putIfAbsent(key, mpr);
+            if (prev != null) {
+                mpr = prev;
+            }
+        }
+
+        mpr.addPullRequest(pullRequest);
+    }
+
+    private String buildKey(final String topic, final int queueId) {
+        StringBuilder sb = new StringBuilder();
+        sb.append(topic);
+        sb.append(TOPIC_QUEUEID_SEPARATOR);
+        sb.append(queueId);
+        return sb.toString();
+    }
+
+    @Override
+    public void run() {
+        log.info(this.getServiceName() + " service started");
+        while (!this.isStopped()) {
+            try {
+                if (this.brokerController.getBrokerConfig().isLongPollingEnable()) {
+                    this.waitForRunning(5 * 1000);
+                } else {
+                    this.waitForRunning(this.brokerController.getBrokerConfig().getShortPollingTimeMills());
+                }
+
+                long beginLockTimestamp = this.systemClock.now();
+                this.checkHoldRequest();
+                long costTime = this.systemClock.now() - beginLockTimestamp;
+                if (costTime > 5 * 1000) {
+                    log.info("[NOTIFYME] check hold request cost {} ms.", costTime);
+                }
+            } catch (Throwable e) {
+                log.warn(this.getServiceName() + " service has exception. ", e);
+            }
+        }
+
+        log.info(this.getServiceName() + " service end");
+    }
+
+    @Override
+    public String getServiceName() {
+        return PullRequestHoldService.class.getSimpleName();
+    }
+
+    private void checkHoldRequest() {
+        for (String key : this.pullRequestTable.keySet()) {
+            String[] kArray = key.split(TOPIC_QUEUEID_SEPARATOR);
+            if (kArray != null && 2 == kArray.length) {
+                String topic = kArray[0];
+                int queueId = Integer.parseInt(kArray[1]);
+                final long offset = this.brokerController.getMessageStore().getMaxOffsetInQuque(topic, queueId);
+                try {
+                    this.notifyMessageArriving(topic, queueId, offset);
+                } catch (Throwable e) {
+                    log.error("check hold request failed. topic={}, queueId={}", topic, queueId, e);
+                }
+            }
+        }
+    }
+
+    public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset) {
+        notifyMessageArriving(topic, queueId, maxOffset, null);
+    }
+
+    public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset, final Long tagsCode) {
+        String key = this.buildKey(topic, queueId);
+        ManyPullRequest mpr = this.pullRequestTable.get(key);
+        if (mpr != null) {
+            List<PullRequest> requestList = mpr.cloneListAndClear();
+            if (requestList != null) {
+                List<PullRequest> replayList = new ArrayList<PullRequest>();
+
+                for (PullRequest request : requestList) {
+                    long newestOffset = maxOffset;
+                    if (newestOffset <= request.getPullFromThisOffset()) {
+                        newestOffset = this.brokerController.getMessageStore().getMaxOffsetInQuque(topic, queueId);
+                    }
+
+                    Long tmp = tagsCode;
+                    if (newestOffset > request.getPullFromThisOffset()) {
+                        if (this.messageFilter.isMessageMatched(request.getSubscriptionData(), tmp)) {
+                            try {
+                                this.brokerController.getPullMessageProcessor().excuteRequestWhenWakeup(request.getClientChannel(),
+                                        request.getRequestCommand());
+                            } catch (Throwable e) {
+                                log.error("execute request when wakeup failed.", e);
+                            }
+                            continue;
+                        }
+                    }
+
+                    if (System.currentTimeMillis() >= (request.getSuspendTimestamp() + request.getTimeoutMillis())) {
+                        try {
+                            this.brokerController.getPullMessageProcessor().excuteRequestWhenWakeup(request.getClientChannel(),
+                                    request.getRequestCommand());
+                        } catch (Throwable e) {
+                            log.error("execute request when wakeup failed.", e);
+                        }
+                        continue;
+                    }
+
+
+                    replayList.add(request);
+                }
+
+                if (!replayList.isEmpty()) {
+                    mpr.addPullRequest(replayList);
+                }
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/ConsumeMessageContext.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/ConsumeMessageContext.java b/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/ConsumeMessageContext.java
new file mode 100644
index 0000000..3a167fa
--- /dev/null
+++ b/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/ConsumeMessageContext.java
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.broker.mqtrace;
+
+import org.apache.rocketmq.store.stats.BrokerStatsManager;
+
+import java.util.Map;
+
+
+public class ConsumeMessageContext {
+    private String consumerGroup;
+    private String topic;
+    private Integer queueId;
+    private String clientHost;
+    private String storeHost;
+    private Map<String, Long> messageIds;
+    private int bodyLength;
+    private boolean success;
+    private String status;
+    private Object mqTraceContext;
+
+    private String commercialOwner;
+    private BrokerStatsManager.StatsType commercialRcvStats;
+    private int commercialRcvTimes;
+    private int commercialRcvSize;
+
+
+    public String getConsumerGroup() {
+        return consumerGroup;
+    }
+
+
+    public void setConsumerGroup(String consumerGroup) {
+        this.consumerGroup = consumerGroup;
+    }
+
+
+    public String getTopic() {
+        return topic;
+    }
+
+
+    public void setTopic(String topic) {
+        this.topic = topic;
+    }
+
+
+    public Integer getQueueId() {
+        return queueId;
+    }
+
+
+    public void setQueueId(Integer queueId) {
+        this.queueId = queueId;
+    }
+
+
+    public String getClientHost() {
+        return clientHost;
+    }
+
+
+    public void setClientHost(String clientHost) {
+        this.clientHost = clientHost;
+    }
+
+
+    public String getStoreHost() {
+        return storeHost;
+    }
+
+
+    public void setStoreHost(String storeHost) {
+        this.storeHost = storeHost;
+    }
+
+
+    public Map<String, Long> getMessageIds() {
+        return messageIds;
+    }
+
+
+    public void setMessageIds(Map<String, Long> messageIds) {
+        this.messageIds = messageIds;
+    }
+
+
+    public boolean isSuccess() {
+        return success;
+    }
+
+
+    public void setSuccess(boolean success) {
+        this.success = success;
+    }
+
+
+    public String getStatus() {
+        return status;
+    }
+
+
+    public void setStatus(String status) {
+        this.status = status;
+    }
+
+
+    public Object getMqTraceContext() {
+        return mqTraceContext;
+    }
+
+
+    public void setMqTraceContext(Object mqTraceContext) {
+        this.mqTraceContext = mqTraceContext;
+    }
+
+
+    public int getBodyLength() {
+        return bodyLength;
+    }
+
+
+    public void setBodyLength(int bodyLength) {
+        this.bodyLength = bodyLength;
+    }
+
+    public String getCommercialOwner() {
+        return commercialOwner;
+    }
+
+    public void setCommercialOwner(final String commercialOwner) {
+        this.commercialOwner = commercialOwner;
+    }
+
+    public BrokerStatsManager.StatsType getCommercialRcvStats() {
+        return commercialRcvStats;
+    }
+
+    public void setCommercialRcvStats(final BrokerStatsManager.StatsType commercialRcvStats) {
+        this.commercialRcvStats = commercialRcvStats;
+    }
+
+    public int getCommercialRcvTimes() {
+        return commercialRcvTimes;
+    }
+
+    public void setCommercialRcvTimes(final int commercialRcvTimes) {
+        this.commercialRcvTimes = commercialRcvTimes;
+    }
+
+    public int getCommercialRcvSize() {
+        return commercialRcvSize;
+    }
+
+    public void setCommercialRcvSize(final int commercialRcvSize) {
+        this.commercialRcvSize = commercialRcvSize;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/ConsumeMessageHook.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/ConsumeMessageHook.java b/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/ConsumeMessageHook.java
new file mode 100644
index 0000000..c4b7f36
--- /dev/null
+++ b/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/ConsumeMessageHook.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.broker.mqtrace;
+
+public interface ConsumeMessageHook {
+    String hookName();
+
+
+    void consumeMessageBefore(final ConsumeMessageContext context);
+
+
+    void consumeMessageAfter(final ConsumeMessageContext context);
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/SendMessageContext.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/SendMessageContext.java b/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/SendMessageContext.java
new file mode 100644
index 0000000..ca8121d
--- /dev/null
+++ b/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/SendMessageContext.java
@@ -0,0 +1,261 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.broker.mqtrace;
+
+import org.apache.rocketmq.common.message.MessageType;
+import org.apache.rocketmq.store.stats.BrokerStatsManager;
+
+import java.util.Properties;
+
+
+public class SendMessageContext {
+    private String producerGroup;
+    private String topic;
+    private String msgId;
+    private String originMsgId;
+    private Integer queueId;
+    private Long queueOffset;
+    private String brokerAddr;
+    private String bornHost;
+    private int bodyLength;
+    private int code;
+    private String errorMsg;
+    private String msgProps;
+    private Object mqTraceContext;
+    private Properties extProps;
+    private String brokerRegionId;
+    private String msgUniqueKey;
+    private long bornTimeStamp;
+    private MessageType msgType = MessageType.Trans_msg_Commit;
+    private boolean isSuccess = false;
+    //For Commercial
+    private String commercialOwner;
+    private BrokerStatsManager.StatsType commercialSendStats;
+    private int commercialSendSize;
+    private int commercialSendTimes;
+
+    public boolean isSuccess() {
+        return isSuccess;
+    }
+
+    public void setSuccess(final boolean success) {
+        isSuccess = success;
+    }
+
+    public MessageType getMsgType() {
+        return msgType;
+    }
+
+    public void setMsgType(final MessageType msgType) {
+        this.msgType = msgType;
+    }
+
+    public String getMsgUniqueKey() {
+        return msgUniqueKey;
+    }
+
+    public void setMsgUniqueKey(final String msgUniqueKey) {
+        this.msgUniqueKey = msgUniqueKey;
+    }
+
+    public long getBornTimeStamp() {
+        return bornTimeStamp;
+    }
+
+    public void setBornTimeStamp(final long bornTimeStamp) {
+        this.bornTimeStamp = bornTimeStamp;
+    }
+
+    public String getBrokerRegionId() {
+        return brokerRegionId;
+    }
+
+    public void setBrokerRegionId(final String brokerRegionId) {
+        this.brokerRegionId = brokerRegionId;
+    }
+
+    public String getProducerGroup() {
+        return producerGroup;
+    }
+
+
+    public void setProducerGroup(String producerGroup) {
+        this.producerGroup = producerGroup;
+    }
+
+
+    public String getTopic() {
+        return topic;
+    }
+
+
+    public void setTopic(String topic) {
+        this.topic = topic;
+    }
+
+
+    public String getMsgId() {
+        return msgId;
+    }
+
+
+    public void setMsgId(String msgId) {
+        this.msgId = msgId;
+    }
+
+
+    public String getOriginMsgId() {
+        return originMsgId;
+    }
+
+
+    public void setOriginMsgId(String originMsgId) {
+        this.originMsgId = originMsgId;
+    }
+
+
+    public Integer getQueueId() {
+        return queueId;
+    }
+
+
+    public void setQueueId(Integer queueId) {
+        this.queueId = queueId;
+    }
+
+
+    public Long getQueueOffset() {
+        return queueOffset;
+    }
+
+
+    public void setQueueOffset(Long queueOffset) {
+        this.queueOffset = queueOffset;
+    }
+
+
+    public String getBrokerAddr() {
+        return brokerAddr;
+    }
+
+
+    public void setBrokerAddr(String brokerAddr) {
+        this.brokerAddr = brokerAddr;
+    }
+
+
+    public String getBornHost() {
+        return bornHost;
+    }
+
+
+    public void setBornHost(String bornHost) {
+        this.bornHost = bornHost;
+    }
+
+
+    public int getBodyLength() {
+        return bodyLength;
+    }
+
+
+    public void setBodyLength(int bodyLength) {
+        this.bodyLength = bodyLength;
+    }
+
+
+    public int getCode() {
+        return code;
+    }
+
+
+    public void setCode(int code) {
+        this.code = code;
+    }
+
+
+    public String getErrorMsg() {
+        return errorMsg;
+    }
+
+
+    public void setErrorMsg(String errorMsg) {
+        this.errorMsg = errorMsg;
+    }
+
+
+    public String getMsgProps() {
+        return msgProps;
+    }
+
+
+    public void setMsgProps(String msgProps) {
+        this.msgProps = msgProps;
+    }
+
+
+    public Object getMqTraceContext() {
+        return mqTraceContext;
+    }
+
+
+    public void setMqTraceContext(Object mqTraceContext) {
+        this.mqTraceContext = mqTraceContext;
+    }
+
+
+    public Properties getExtProps() {
+        return extProps;
+    }
+
+
+    public void setExtProps(Properties extProps) {
+        this.extProps = extProps;
+    }
+
+    public String getCommercialOwner() {
+        return commercialOwner;
+    }
+
+    public void setCommercialOwner(final String commercialOwner) {
+        this.commercialOwner = commercialOwner;
+    }
+
+    public BrokerStatsManager.StatsType getCommercialSendStats() {
+        return commercialSendStats;
+    }
+
+    public void setCommercialSendStats(final BrokerStatsManager.StatsType commercialSendStats) {
+        this.commercialSendStats = commercialSendStats;
+    }
+
+    public int getCommercialSendSize() {
+        return commercialSendSize;
+    }
+
+    public void setCommercialSendSize(final int commercialSendSize) {
+        this.commercialSendSize = commercialSendSize;
+    }
+
+    public int getCommercialSendTimes() {
+        return commercialSendTimes;
+    }
+
+    public void setCommercialSendTimes(final int commercialSendTimes) {
+        this.commercialSendTimes = commercialSendTimes;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/SendMessageHook.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/SendMessageHook.java b/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/SendMessageHook.java
new file mode 100644
index 0000000..84cbdcb
--- /dev/null
+++ b/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/SendMessageHook.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.broker.mqtrace;
+
+public interface SendMessageHook {
+    public String hookName();
+
+
+    public void sendMessageBefore(final SendMessageContext context);
+
+
+    public void sendMessageAfter(final SendMessageContext context);
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
new file mode 100644
index 0000000..8a1773a
--- /dev/null
+++ b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
@@ -0,0 +1,249 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.broker.offset;
+
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.BrokerPathConfigHelper;
+import org.apache.rocketmq.common.ConfigManager;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class ConsumerOffsetManager extends ConfigManager {
+    private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+    private static final String TOPIC_GROUP_SEPARATOR = "@";
+
+    private ConcurrentHashMap<String/* topic@group */, ConcurrentHashMap<Integer, Long>> offsetTable =
+            new ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>>(512);
+
+    private transient BrokerController brokerController;
+
+
+    public ConsumerOffsetManager() {
+    }
+
+
+    public ConsumerOffsetManager(BrokerController brokerController) {
+        this.brokerController = brokerController;
+    }
+
+
+    public void scanUnsubscribedTopic() {
+        Iterator<Entry<String, ConcurrentHashMap<Integer, Long>>> it = this.offsetTable.entrySet().iterator();
+        while (it.hasNext()) {
+            Entry<String, ConcurrentHashMap<Integer, Long>> next = it.next();
+            String topicAtGroup = next.getKey();
+            String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR);
+            if (arrays != null && arrays.length == 2) {
+                String topic = arrays[0];
+                String group = arrays[1];
+
+                if (null == brokerController.getConsumerManager().findSubscriptionData(group, topic)
+                        && this.offsetBehindMuchThanData(topic, next.getValue())) {
+                    it.remove();
+                    log.warn("remove topic offset, {}", topicAtGroup);
+                }
+            }
+        }
+    }
+
+
+    private boolean offsetBehindMuchThanData(final String topic, ConcurrentHashMap<Integer, Long> table) {
+        Iterator<Entry<Integer, Long>> it = table.entrySet().iterator();
+        boolean result = !table.isEmpty();
+
+        while (it.hasNext() && result) {
+            Entry<Integer, Long> next = it.next();
+            long minOffsetInStore = this.brokerController.getMessageStore().getMinOffsetInQuque(topic, next.getKey());
+            long offsetInPersist = next.getValue();
+            if (offsetInPersist > minOffsetInStore) {
+                result = false;
+            } else {
+                result = true;
+            }
+        }
+
+        return result;
+    }
+
+
+    public Set<String> whichTopicByConsumer(final String group) {
+        Set<String> topics = new HashSet<String>();
+
+        Iterator<Entry<String, ConcurrentHashMap<Integer, Long>>> it = this.offsetTable.entrySet().iterator();
+        while (it.hasNext()) {
+            Entry<String, ConcurrentHashMap<Integer, Long>> next = it.next();
+            String topicAtGroup = next.getKey();
+            String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR);
+            if (arrays != null && arrays.length == 2) {
+                if (group.equals(arrays[1])) {
+                    topics.add(arrays[0]);
+                }
+            }
+        }
+
+        return topics;
+    }
+
+
+    public Set<String> whichGroupByTopic(final String topic) {
+        Set<String> groups = new HashSet<String>();
+
+        Iterator<Entry<String, ConcurrentHashMap<Integer, Long>>> it = this.offsetTable.entrySet().iterator();
+        while (it.hasNext()) {
+            Entry<String, ConcurrentHashMap<Integer, Long>> next = it.next();
+            String topicAtGroup = next.getKey();
+            String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR);
+            if (arrays != null && arrays.length == 2) {
+                if (topic.equals(arrays[0])) {
+                    groups.add(arrays[1]);
+                }
+            }
+        }
+
+        return groups;
+    }
+
+
+    public void commitOffset(final String clientHost, final String group, final String topic, final int queueId, final long offset) {
+        // topic@group
+        String key = topic + TOPIC_GROUP_SEPARATOR + group;
+        this.commitOffset(clientHost, key, queueId, offset);
+    }
+
+    private void commitOffset(final String clientHost, final String key, final int queueId, final long offset) {
+        ConcurrentHashMap<Integer, Long> map = this.offsetTable.get(key);
+        if (null == map) {
+            map = new ConcurrentHashMap<Integer, Long>(32);
+            map.put(queueId, offset);
+            this.offsetTable.put(key, map);
+        } else {
+            Long storeOffset = map.put(queueId, offset);
+            if (storeOffset != null && offset < storeOffset) {
+                log.warn("[NOTIFYME]update consumer offset less than store. clientHost={}, key={}, queueId={}, requestOffset={}, storeOffset={}", clientHost, key, queueId, offset, storeOffset);
+            }
+        }
+    }
+
+    public long queryOffset(final String group, final String topic, final int queueId) {
+        // topic@group
+        String key = topic + TOPIC_GROUP_SEPARATOR + group;
+        ConcurrentHashMap<Integer, Long> map = this.offsetTable.get(key);
+        if (null != map) {
+            Long offset = map.get(queueId);
+            if (offset != null)
+                return offset;
+        }
+
+        return -1;
+    }
+
+    public String encode() {
+        return this.encode(false);
+    }
+
+    @Override
+    public String configFilePath() {
+        return BrokerPathConfigHelper.getConsumerOffsetPath(this.brokerController.getMessageStoreConfig().getStorePathRootDir());
+    }
+
+    @Override
+    public void decode(String jsonString) {
+        if (jsonString != null) {
+            ConsumerOffsetManager obj = RemotingSerializable.fromJson(jsonString, ConsumerOffsetManager.class);
+            if (obj != null) {
+                this.offsetTable = obj.offsetTable;
+            }
+        }
+    }
+
+    public String encode(final boolean prettyFormat) {
+        return RemotingSerializable.toJson(this, prettyFormat);
+    }
+
+    public ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>> getOffsetTable() {
+        return offsetTable;
+    }
+
+
+    public void setOffsetTable(ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>> offsetTable) {
+        this.offsetTable = offsetTable;
+    }
+
+
+    public Map<Integer, Long> queryMinOffsetInAllGroup(final String topic, final String filterGroups) {
+
+        Map<Integer, Long> queueMinOffset = new HashMap<Integer, Long>();
+        Set<String> topicGroups = this.offsetTable.keySet();
+        if (!UtilAll.isBlank(filterGroups)) {
+            for (String group : filterGroups.split(",")) {
+                Iterator<String> it = topicGroups.iterator();
+                while (it.hasNext()) {
+                    if (group.equals(it.next().split(TOPIC_GROUP_SEPARATOR)[1])) {
+                        it.remove();
+                    }
+                }
+            }
+        }
+
+        for (Map.Entry<String, ConcurrentHashMap<Integer, Long>> offSetEntry : this.offsetTable.entrySet()) {
+            String topicGroup = offSetEntry.getKey();
+            String[] topicGroupArr = topicGroup.split(TOPIC_GROUP_SEPARATOR);
+            if (topic.equals(topicGroupArr[0])) {
+                for (Entry<Integer, Long> entry : offSetEntry.getValue().entrySet()) {
+                    long minOffset = this.brokerController.getMessageStore().getMinOffsetInQuque(topic, entry.getKey());
+                    if (entry.getValue() >= minOffset) {
+                        Long offset = queueMinOffset.get(entry.getKey());
+                        if (offset == null) {
+                            queueMinOffset.put(entry.getKey(), Math.min(Long.MAX_VALUE, entry.getValue()));
+                        } else {
+                            queueMinOffset.put(entry.getKey(), Math.min(entry.getValue(), offset));
+                        }
+                    }
+                }
+            }
+
+        }
+        return queueMinOffset;
+    }
+
+
+    public Map<Integer, Long> queryOffset(final String group, final String topic) {
+        // topic@group
+        String key = topic + TOPIC_GROUP_SEPARATOR + group;
+        return this.offsetTable.get(key);
+    }
+
+
+    public void cloneOffset(final String srcGroup, final String destGroup, final String topic) {
+        ConcurrentHashMap<Integer, Long> offsets = this.offsetTable.get(topic + TOPIC_GROUP_SEPARATOR + srcGroup);
+        if (offsets != null) {
+            this.offsetTable.put(topic + TOPIC_GROUP_SEPARATOR + destGroup, new ConcurrentHashMap<Integer, Long>(offsets));
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
new file mode 100644
index 0000000..2836c4c
--- /dev/null
+++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
@@ -0,0 +1,302 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.broker.out;
+
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.namesrv.RegisterBrokerResult;
+import org.apache.rocketmq.common.namesrv.TopAddressing;
+import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.body.*;
+import org.apache.rocketmq.common.protocol.header.namesrv.RegisterBrokerRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.RegisterBrokerResponseHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.UnRegisterBrokerRequestHeader;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.remoting.RemotingClient;
+import org.apache.rocketmq.remoting.exception.*;
+import org.apache.rocketmq.remoting.netty.NettyClientConfig;
+import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.List;
+
+
+/**
+ * @author shijia.wxr
+ * @author manhong.yqd
+ */
+public class BrokerOuterAPI {
+    private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+    private final RemotingClient remotingClient;
+    private final TopAddressing topAddressing = new TopAddressing(MixAll.WS_ADDR);
+    private String nameSrvAddr = null;
+
+    public BrokerOuterAPI(final NettyClientConfig nettyClientConfig) {
+        this(nettyClientConfig, null);
+    }
+
+    public BrokerOuterAPI(final NettyClientConfig nettyClientConfig, RPCHook rpcHook) {
+        this.remotingClient = new NettyRemotingClient(nettyClientConfig);
+        this.remotingClient.registerRPCHook(rpcHook);
+    }
+
+    public void start() {
+        this.remotingClient.start();
+    }
+
+    public void shutdown() {
+        this.remotingClient.shutdown();
+    }
+
+    public String fetchNameServerAddr() {
+        try {
+            String addrs = this.topAddressing.fetchNSAddr();
+            if (addrs != null) {
+                if (!addrs.equals(this.nameSrvAddr)) {
+                    log.info("name server address changed, old: " + this.nameSrvAddr + " new: " + addrs);
+                    this.updateNameServerAddressList(addrs);
+                    this.nameSrvAddr = addrs;
+                    return nameSrvAddr;
+                }
+            }
+        } catch (Exception e) {
+            log.error("fetchNameServerAddr Exception", e);
+        }
+        return nameSrvAddr;
+    }
+
+    public void updateNameServerAddressList(final String addrs) {
+        List<String> lst = new ArrayList<String>();
+        String[] addrArray = addrs.split(";");
+        if (addrArray != null) {
+            for (String addr : addrArray) {
+                lst.add(addr);
+            }
+
+            this.remotingClient.updateNameServerAddressList(lst);
+        }
+    }
+
+    public RegisterBrokerResult registerBrokerAll(
+            final String clusterName,
+            final String brokerAddr,
+            final String brokerName,
+            final long brokerId,
+            final String haServerAddr,
+            final TopicConfigSerializeWrapper topicConfigWrapper,
+            final List<String> filterServerList,
+            final boolean oneway,
+            final int timeoutMills) {
+        RegisterBrokerResult registerBrokerResult = null;
+
+        List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
+        if (nameServerAddressList != null) {
+            for (String namesrvAddr : nameServerAddressList) {
+                try {
+                    RegisterBrokerResult result = this.registerBroker(namesrvAddr, clusterName, brokerAddr, brokerName, brokerId,
+                            haServerAddr, topicConfigWrapper, filterServerList, oneway, timeoutMills);
+                    if (result != null) {
+                        registerBrokerResult = result;
+                    }
+
+                    log.info("register broker to name server {} OK", namesrvAddr);
+                } catch (Exception e) {
+                    log.warn("registerBroker Exception, " + namesrvAddr, e);
+                }
+            }
+        }
+
+        return registerBrokerResult;
+    }
+
+    private RegisterBrokerResult registerBroker(
+            final String namesrvAddr,
+            final String clusterName,
+            final String brokerAddr,
+            final String brokerName,
+            final long brokerId,
+            final String haServerAddr,
+            final TopicConfigSerializeWrapper topicConfigWrapper,
+            final List<String> filterServerList,
+            final boolean oneway,
+            final int timeoutMills
+    ) throws RemotingCommandException, MQBrokerException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
+            InterruptedException {
+        RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();
+        requestHeader.setBrokerAddr(brokerAddr);
+        requestHeader.setBrokerId(brokerId);
+        requestHeader.setBrokerName(brokerName);
+        requestHeader.setClusterName(clusterName);
+        requestHeader.setHaServerAddr(haServerAddr);
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.REGISTER_BROKER, requestHeader);
+
+        RegisterBrokerBody requestBody = new RegisterBrokerBody();
+        requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);
+        requestBody.setFilterServerList(filterServerList);
+        request.setBody(requestBody.encode());
+
+        if (oneway) {
+            try {
+                this.remotingClient.invokeOneway(namesrvAddr, request, timeoutMills);
+            } catch (RemotingTooMuchRequestException e) {
+            }
+            return null;
+        }
+
+        RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, timeoutMills);
+        assert response != null;
+        switch (response.getCode()) {
+            case ResponseCode.SUCCESS: {
+                RegisterBrokerResponseHeader responseHeader =
+                        (RegisterBrokerResponseHeader) response.decodeCommandCustomHeader(RegisterBrokerResponseHeader.class);
+                RegisterBrokerResult result = new RegisterBrokerResult();
+                result.setMasterAddr(responseHeader.getMasterAddr());
+                result.setHaServerAddr(responseHeader.getHaServerAddr());
+                result.setHaServerAddr(responseHeader.getHaServerAddr());
+                if (response.getBody() != null) {
+                    result.setKvTable(KVTable.decode(response.getBody(), KVTable.class));
+                }
+                return result;
+            }
+            default:
+                break;
+        }
+
+        throw new MQBrokerException(response.getCode(), response.getRemark());
+    }
+
+    public void unregisterBrokerAll(
+            final String clusterName,
+            final String brokerAddr,
+            final String brokerName,
+            final long brokerId
+    ) {
+        List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
+        if (nameServerAddressList != null) {
+            for (String namesrvAddr : nameServerAddressList) {
+                try {
+                    this.unregisterBroker(namesrvAddr, clusterName, brokerAddr, brokerName, brokerId);
+                    log.info("unregisterBroker OK, NamesrvAddr: {}", namesrvAddr);
+                } catch (Exception e) {
+                    log.warn("unregisterBroker Exception, " + namesrvAddr, e);
+                }
+            }
+        }
+    }
+
+    public void unregisterBroker(
+            final String namesrvAddr,
+            final String clusterName,
+            final String brokerAddr,
+            final String brokerName,
+            final long brokerId
+    ) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException {
+        UnRegisterBrokerRequestHeader requestHeader = new UnRegisterBrokerRequestHeader();
+        requestHeader.setBrokerAddr(brokerAddr);
+        requestHeader.setBrokerId(brokerId);
+        requestHeader.setBrokerName(brokerName);
+        requestHeader.setClusterName(clusterName);
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UNREGISTER_BROKER, requestHeader);
+
+        RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, 3000);
+        assert response != null;
+        switch (response.getCode()) {
+            case ResponseCode.SUCCESS: {
+                return;
+            }
+            default:
+                break;
+        }
+
+        throw new MQBrokerException(response.getCode(), response.getRemark());
+    }
+
+    public TopicConfigSerializeWrapper getAllTopicConfig(final String addr) throws RemotingConnectException, RemotingSendRequestException,
+            RemotingTimeoutException, InterruptedException, MQBrokerException {
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_TOPIC_CONFIG, null);
+
+        RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(true, addr), request, 3000);
+        assert response != null;
+        switch (response.getCode()) {
+            case ResponseCode.SUCCESS: {
+                return TopicConfigSerializeWrapper.decode(response.getBody(), TopicConfigSerializeWrapper.class);
+            }
+            default:
+                break;
+        }
+
+        throw new MQBrokerException(response.getCode(), response.getRemark());
+    }
+
+    public ConsumerOffsetSerializeWrapper getAllConsumerOffset(final String addr) throws InterruptedException, RemotingTimeoutException,
+            RemotingSendRequestException, RemotingConnectException, MQBrokerException {
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_CONSUMER_OFFSET, null);
+        RemotingCommand response = this.remotingClient.invokeSync(addr, request, 3000);
+        assert response != null;
+        switch (response.getCode()) {
+            case ResponseCode.SUCCESS: {
+                return ConsumerOffsetSerializeWrapper.decode(response.getBody(), ConsumerOffsetSerializeWrapper.class);
+            }
+            default:
+                break;
+        }
+
+        throw new MQBrokerException(response.getCode(), response.getRemark());
+    }
+
+    public String getAllDelayOffset(final String addr) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException,
+            RemotingConnectException, MQBrokerException, UnsupportedEncodingException {
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_DELAY_OFFSET, null);
+        RemotingCommand response = this.remotingClient.invokeSync(addr, request, 3000);
+        assert response != null;
+        switch (response.getCode()) {
+            case ResponseCode.SUCCESS: {
+                return new String(response.getBody(), MixAll.DEFAULT_CHARSET);
+            }
+            default:
+                break;
+        }
+
+        throw new MQBrokerException(response.getCode(), response.getRemark());
+    }
+
+    public SubscriptionGroupWrapper getAllSubscriptionGroupConfig(final String addr) throws InterruptedException, RemotingTimeoutException,
+            RemotingSendRequestException, RemotingConnectException, MQBrokerException {
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_SUBSCRIPTIONGROUP_CONFIG, null);
+        RemotingCommand response = this.remotingClient.invokeSync(addr, request, 3000);
+        assert response != null;
+        switch (response.getCode()) {
+            case ResponseCode.SUCCESS: {
+                return SubscriptionGroupWrapper.decode(response.getBody(), SubscriptionGroupWrapper.class);
+            }
+            default:
+                break;
+        }
+
+        throw new MQBrokerException(response.getCode(), response.getRemark());
+    }
+
+    public void registerRPCHook(RPCHook rpcHook) {
+        remotingClient.registerRPCHook(rpcHook);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/org/apache/rocketmq/broker/pagecache/ManyMessageTransfer.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/pagecache/ManyMessageTransfer.java b/broker/src/main/java/org/apache/rocketmq/broker/pagecache/ManyMessageTransfer.java
new file mode 100644
index 0000000..d26eab8
--- /dev/null
+++ b/broker/src/main/java/org/apache/rocketmq/broker/pagecache/ManyMessageTransfer.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.broker.pagecache;
+
+import org.apache.rocketmq.store.GetMessageResult;
+import io.netty.channel.FileRegion;
+import io.netty.util.AbstractReferenceCounted;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
+import java.util.List;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class ManyMessageTransfer extends AbstractReferenceCounted implements FileRegion {
+    private final ByteBuffer byteBufferHeader;
+    private final GetMessageResult getMessageResult;
+    private long transfered; // the bytes which was transfered already
+
+
+    public ManyMessageTransfer(ByteBuffer byteBufferHeader, GetMessageResult getMessageResult) {
+        this.byteBufferHeader = byteBufferHeader;
+        this.getMessageResult = getMessageResult;
+    }
+
+
+    @Override
+    public long position() {
+        int pos = byteBufferHeader.position();
+        List<ByteBuffer> messageBufferList = this.getMessageResult.getMessageBufferList();
+        for (ByteBuffer bb : messageBufferList) {
+            pos += bb.position();
+        }
+        return pos;
+    }
+
+    @Override
+    public long transfered() {
+        return transfered;
+    }
+
+    @Override
+    public long count() {
+        return byteBufferHeader.limit() + this.getMessageResult.getBufferTotalSize();
+    }
+
+    @Override
+    public long transferTo(WritableByteChannel target, long position) throws IOException {
+        if (this.byteBufferHeader.hasRemaining()) {
+            transfered += target.write(this.byteBufferHeader);
+            return transfered;
+        } else {
+            List<ByteBuffer> messageBufferList = this.getMessageResult.getMessageBufferList();
+            for (ByteBuffer bb : messageBufferList) {
+                if (bb.hasRemaining()) {
+                    transfered += target.write(bb);
+                    return transfered;
+                }
+            }
+        }
+
+        return 0;
+    }
+
+    public void close() {
+        this.deallocate();
+    }
+
+    @Override
+    protected void deallocate() {
+        this.getMessageResult.release();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/org/apache/rocketmq/broker/pagecache/OneMessageTransfer.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/pagecache/OneMessageTransfer.java b/broker/src/main/java/org/apache/rocketmq/broker/pagecache/OneMessageTransfer.java
new file mode 100644
index 0000000..97d1faa
--- /dev/null
+++ b/broker/src/main/java/org/apache/rocketmq/broker/pagecache/OneMessageTransfer.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.broker.pagecache;
+
+import org.apache.rocketmq.store.SelectMappedBufferResult;
+import io.netty.channel.FileRegion;
+import io.netty.util.AbstractReferenceCounted;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class OneMessageTransfer extends AbstractReferenceCounted implements FileRegion {
+    private final ByteBuffer byteBufferHeader;
+    private final SelectMappedBufferResult selectMappedBufferResult;
+    private long transfered; // the bytes which was transfered already
+
+
+    public OneMessageTransfer(ByteBuffer byteBufferHeader, SelectMappedBufferResult selectMappedBufferResult) {
+        this.byteBufferHeader = byteBufferHeader;
+        this.selectMappedBufferResult = selectMappedBufferResult;
+    }
+
+
+    @Override
+    public long position() {
+        return this.byteBufferHeader.position() + this.selectMappedBufferResult.getByteBuffer().position();
+    }
+
+    @Override
+    public long transfered() {
+        return transfered;
+    }
+
+    @Override
+    public long count() {
+        return this.byteBufferHeader.limit() + this.selectMappedBufferResult.getSize();
+    }
+
+    @Override
+    public long transferTo(WritableByteChannel target, long position) throws IOException {
+        if (this.byteBufferHeader.hasRemaining()) {
+            transfered += target.write(this.byteBufferHeader);
+            return transfered;
+        } else if (this.selectMappedBufferResult.getByteBuffer().hasRemaining()) {
+            transfered += target.write(this.selectMappedBufferResult.getByteBuffer());
+            return transfered;
+        }
+
+        return 0;
+    }
+
+    public void close() {
+        this.deallocate();
+    }
+
+    @Override
+    protected void deallocate() {
+        this.selectMappedBufferResult.release();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/org/apache/rocketmq/broker/pagecache/QueryMessageTransfer.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/pagecache/QueryMessageTransfer.java b/broker/src/main/java/org/apache/rocketmq/broker/pagecache/QueryMessageTransfer.java
new file mode 100644
index 0000000..2d21c19
--- /dev/null
+++ b/broker/src/main/java/org/apache/rocketmq/broker/pagecache/QueryMessageTransfer.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.broker.pagecache;
+
+import org.apache.rocketmq.store.QueryMessageResult;
+import io.netty.channel.FileRegion;
+import io.netty.util.AbstractReferenceCounted;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
+import java.util.List;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class QueryMessageTransfer extends AbstractReferenceCounted implements FileRegion {
+    private final ByteBuffer byteBufferHeader;
+    private final QueryMessageResult queryMessageResult;
+    private long transfered; // the bytes which was transfered already
+
+
+    public QueryMessageTransfer(ByteBuffer byteBufferHeader, QueryMessageResult queryMessageResult) {
+        this.byteBufferHeader = byteBufferHeader;
+        this.queryMessageResult = queryMessageResult;
+    }
+
+
+    @Override
+    public long position() {
+        int pos = byteBufferHeader.position();
+        List<ByteBuffer> messageBufferList = this.queryMessageResult.getMessageBufferList();
+        for (ByteBuffer bb : messageBufferList) {
+            pos += bb.position();
+        }
+        return pos;
+    }
+
+    @Override
+    public long transfered() {
+        return transfered;
+    }
+
+    @Override
+    public long count() {
+        return byteBufferHeader.limit() + this.queryMessageResult.getBufferTotalSize();
+    }
+
+    @Override
+    public long transferTo(WritableByteChannel target, long position) throws IOException {
+        if (this.byteBufferHeader.hasRemaining()) {
+            transfered += target.write(this.byteBufferHeader);
+            return transfered;
+        } else {
+            List<ByteBuffer> messageBufferList = this.queryMessageResult.getMessageBufferList();
+            for (ByteBuffer bb : messageBufferList) {
+                if (bb.hasRemaining()) {
+                    transfered += target.write(bb);
+                    return transfered;
+                }
+            }
+        }
+
+        return 0;
+    }
+
+    public void close() {
+        this.deallocate();
+    }
+
+    @Override
+    protected void deallocate() {
+        this.queryMessageResult.release();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java b/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java
new file mode 100644
index 0000000..601e2f3
--- /dev/null
+++ b/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java
@@ -0,0 +1,236 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+/**
+ *
+ */
+package org.apache.rocketmq.broker.plugin;
+
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import org.apache.rocketmq.store.*;
+
+import java.util.HashMap;
+import java.util.Set;
+
+public abstract class AbstractPluginMessageStore implements MessageStore {
+    protected MessageStore next = null;
+    protected MessageStorePluginContext context;
+
+    public AbstractPluginMessageStore(MessageStorePluginContext context, MessageStore next) {
+        this.next = next;
+        this.context = context;
+    }
+
+    @Override
+    public long getEarliestMessageTime() {
+        return next.getEarliestMessageTime();
+    }
+
+    @Override
+    public long lockTimeMills() {
+        return next.lockTimeMills();
+    }
+
+    @Override
+    public boolean isOSPageCacheBusy() {
+        return next.isOSPageCacheBusy();
+    }
+
+    @Override
+    public boolean isTransientStorePoolDeficient() {
+        return next.isTransientStorePoolDeficient();
+    }
+
+    @Override
+    public boolean load() {
+        return next.load();
+    }
+
+    @Override
+    public void start() throws Exception {
+        next.start();
+    }
+
+    @Override
+    public void shutdown() {
+        next.shutdown();
+    }
+
+    @Override
+    public void destroy() {
+        next.destroy();
+    }
+
+    @Override
+    public PutMessageResult putMessage(MessageExtBrokerInner msg) {
+        return next.putMessage(msg);
+    }
+
+    @Override
+    public GetMessageResult getMessage(String group, String topic, int queueId, long offset,
+                                       int maxMsgNums, SubscriptionData subscriptionData) {
+        return next.getMessage(group, topic, queueId, offset, maxMsgNums, subscriptionData);
+    }
+
+    @Override
+    public long getMaxOffsetInQuque(String topic, int queueId) {
+        return next.getMaxOffsetInQuque(topic, queueId);
+    }
+
+    @Override
+    public long getMinOffsetInQuque(String topic, int queueId) {
+        return next.getMinOffsetInQuque(topic, queueId);
+    }
+
+    @Override
+    public long getCommitLogOffsetInQueue(String topic, int queueId, long cqOffset) {
+        return next.getCommitLogOffsetInQueue(topic, queueId, cqOffset);
+    }
+
+    @Override
+    public long getOffsetInQueueByTime(String topic, int queueId, long timestamp) {
+        return next.getOffsetInQueueByTime(topic, queueId, timestamp);
+    }
+
+    @Override
+    public MessageExt lookMessageByOffset(long commitLogOffset) {
+        return next.lookMessageByOffset(commitLogOffset);
+    }
+
+    @Override
+    public SelectMappedBufferResult selectOneMessageByOffset(long commitLogOffset) {
+        return next.selectOneMessageByOffset(commitLogOffset);
+    }
+
+    @Override
+    public SelectMappedBufferResult selectOneMessageByOffset(long commitLogOffset, int msgSize) {
+        return next.selectOneMessageByOffset(commitLogOffset, msgSize);
+    }
+
+    @Override
+    public String getRunningDataInfo() {
+        return next.getRunningDataInfo();
+    }
+
+    @Override
+    public HashMap<String, String> getRuntimeInfo() {
+        return next.getRuntimeInfo();
+    }
+
+    @Override
+    public long getMaxPhyOffset() {
+        return next.getMaxPhyOffset();
+    }
+
+    @Override
+    public long getMinPhyOffset() {
+        return next.getMinPhyOffset();
+    }
+
+    @Override
+    public long getEarliestMessageTime(String topic, int queueId) {
+        return next.getEarliestMessageTime(topic, queueId);
+    }
+
+    @Override
+    public long getMessageStoreTimeStamp(String topic, int queueId, long offset) {
+        return next.getMessageStoreTimeStamp(topic, queueId, offset);
+    }
+
+    @Override
+    public long getMessageTotalInQueue(String topic, int queueId) {
+        return next.getMessageTotalInQueue(topic, queueId);
+    }
+
+    @Override
+    public SelectMappedBufferResult getCommitLogData(long offset) {
+        return next.getCommitLogData(offset);
+    }
+
+    @Override
+    public boolean appendToCommitLog(long startOffset, byte[] data) {
+        return next.appendToCommitLog(startOffset, data);
+    }
+
+    @Override
+    public void excuteDeleteFilesManualy() {
+        next.excuteDeleteFilesManualy();
+    }
+
+    @Override
+    public QueryMessageResult queryMessage(String topic, String key, int maxNum, long begin,
+                                           long end) {
+        return next.queryMessage(topic, key, maxNum, begin, end);
+    }
+
+    @Override
+    public void updateHaMasterAddress(String newAddr) {
+        next.updateHaMasterAddress(newAddr);
+    }
+
+    @Override
+    public long slaveFallBehindMuch() {
+        return next.slaveFallBehindMuch();
+    }
+
+    @Override
+    public long now() {
+        return next.now();
+    }
+
+    @Override
+    public int cleanUnusedTopic(Set<String> topics) {
+        return next.cleanUnusedTopic(topics);
+    }
+
+    @Override
+    public void cleanExpiredConsumerQueue() {
+        next.cleanExpiredConsumerQueue();
+    }
+
+    @Override
+    public boolean checkInDiskByConsumeOffset(String topic, int queueId, long consumeOffset) {
+        return next.checkInDiskByConsumeOffset(topic, queueId, consumeOffset);
+    }
+
+    @Override
+    public long dispatchBehindBytes() {
+        return next.dispatchBehindBytes();
+    }
+
+    @Override
+    public long flush() {
+        return next.flush();
+    }
+
+    @Override
+    public boolean resetWriteOffset(long phyOffset) {
+        return next.resetWriteOffset(phyOffset);
+    }
+
+    @Override
+    public long getConfirmOffset() {
+        return next.getConfirmOffset();
+    }
+
+    @Override
+    public void setConfirmOffset(long phyOffset) {
+        next.setConfirmOffset(phyOffset);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/org/apache/rocketmq/broker/plugin/MessageStoreFactory.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/plugin/MessageStoreFactory.java b/broker/src/main/java/org/apache/rocketmq/broker/plugin/MessageStoreFactory.java
new file mode 100644
index 0000000..d27b6aa
--- /dev/null
+++ b/broker/src/main/java/org/apache/rocketmq/broker/plugin/MessageStoreFactory.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+/**
+ *
+ */
+package org.apache.rocketmq.broker.plugin;
+
+import org.apache.rocketmq.store.MessageStore;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+
+public final class MessageStoreFactory {
+    public final static MessageStore build(MessageStorePluginContext context, MessageStore messageStore)
+            throws IOException {
+        String plugin = context.getBrokerConfig().getMessageStorePlugIn();
+        if (plugin != null && plugin.trim().length() != 0) {
+            String[] pluginClasses = plugin.split(",");
+            for (int i = pluginClasses.length - 1; i >= 0; --i) {
+                String pluginClass = pluginClasses[i];
+                try {
+                    @SuppressWarnings("unchecked")
+                    Class<AbstractPluginMessageStore> clazz = (Class<AbstractPluginMessageStore>) Class.forName(pluginClass);
+                    Constructor<AbstractPluginMessageStore> construct = clazz.getConstructor(MessageStorePluginContext.class, MessageStore.class);
+                    AbstractPluginMessageStore pluginMessageStore = (AbstractPluginMessageStore) construct.newInstance(context, messageStore);
+                    messageStore = pluginMessageStore;
+                } catch (Throwable e) {
+                    throw new RuntimeException(String.format(
+                            "Initialize plugin's class %s not found!", pluginClass), e);
+                }
+            }
+        }
+        return messageStore;
+    }
+}