You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2016/12/28 09:14:19 UTC

[28/99] [abbrv] [partial] incubator-rocketmq git commit: ROCKETMQ-18 Rename package name from com.alibaba to org.apache

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueByConfig.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueByConfig.java b/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueByConfig.java
new file mode 100644
index 0000000..3784752
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueByConfig.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.client.consumer.rebalance;
+
+import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
+import org.apache.rocketmq.common.message.MessageQueue;
+
+import java.util.List;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class AllocateMessageQueueByConfig implements AllocateMessageQueueStrategy {
+    private List<MessageQueue> messageQueueList;
+
+    @Override
+    public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
+                                       List<String> cidAll) {
+        return this.messageQueueList;
+    }
+
+    @Override
+    public String getName() {
+        return "CONFIG";
+    }
+
+    public List<MessageQueue> getMessageQueueList() {
+        return messageQueueList;
+    }
+
+
+    public void setMessageQueueList(List<MessageQueue> messageQueueList) {
+        this.messageQueueList = messageQueueList;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueByMachineRoom.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueByMachineRoom.java b/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueByMachineRoom.java
new file mode 100644
index 0000000..d3448c9
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueByMachineRoom.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.client.consumer.rebalance;
+
+import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
+import org.apache.rocketmq.common.message.MessageQueue;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+
+/**
+ * Computer room Hashing queue algorithm, such as Alipay logic room
+ */
+public class AllocateMessageQueueByMachineRoom implements AllocateMessageQueueStrategy {
+    private Set<String> consumeridcs;
+
+    @Override
+    public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
+                                       List<String> cidAll) {
+        List<MessageQueue> result = new ArrayList<MessageQueue>();
+        int currentIndex = cidAll.indexOf(currentCID);
+        if (currentIndex < 0) {
+            return result;
+        }
+        List<MessageQueue> premqAll = new ArrayList<MessageQueue>();
+        for (MessageQueue mq : mqAll) {
+            String[] temp = mq.getBrokerName().split("@");
+            if (temp.length == 2 && consumeridcs.contains(temp[0])) {
+                premqAll.add(mq);
+            }
+        }
+        // Todo cid
+        int mod = premqAll.size() / cidAll.size();
+        int rem = premqAll.size() % cidAll.size();
+        int startindex = mod * currentIndex;
+        int endindex = startindex + mod;
+        for (int i = startindex; i < endindex; i++) {
+            result.add(mqAll.get(i));
+        }
+        if (rem > currentIndex) {
+            result.add(premqAll.get(currentIndex + mod * cidAll.size()));
+        }
+        return result;
+    }
+
+    @Override
+    public String getName() {
+        return "MACHINE_ROOM";
+    }
+
+    public Set<String> getConsumeridcs() {
+        return consumeridcs;
+    }
+
+
+    public void setConsumeridcs(Set<String> consumeridcs) {
+        this.consumeridcs = consumeridcs;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java b/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java
new file mode 100644
index 0000000..f4d87e7
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java
@@ -0,0 +1,228 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.consumer.store;
+
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.client.log.ClientLogger;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.help.FAQUrl;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.slf4j.Logger;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+
+/**
+ * Local storage implementation
+ *
+ * @author shijia.wxr
+ */
+public class LocalFileOffsetStore implements OffsetStore {
+    public final static String LOCAL_OFFSET_STORE_DIR = System.getProperty(
+            "rocketmq.client.localOffsetStoreDir",
+            System.getProperty("user.home") + File.separator + ".rocketmq_offsets");
+    private final static Logger log = ClientLogger.getLog();
+    private final MQClientInstance mQClientFactory;
+    private final String groupName;
+    private final String storePath;
+    private ConcurrentHashMap<MessageQueue, AtomicLong> offsetTable =
+            new ConcurrentHashMap<MessageQueue, AtomicLong>();
+
+
+    public LocalFileOffsetStore(MQClientInstance mQClientFactory, String groupName) {
+        this.mQClientFactory = mQClientFactory;
+        this.groupName = groupName;
+        this.storePath = LOCAL_OFFSET_STORE_DIR + File.separator + //
+                this.mQClientFactory.getClientId() + File.separator + //
+                this.groupName + File.separator + //
+                "offsets.json";
+    }
+
+
+    @Override
+    public void load() throws MQClientException {
+        OffsetSerializeWrapper offsetSerializeWrapper = this.readLocalOffset();
+        if (offsetSerializeWrapper != null && offsetSerializeWrapper.getOffsetTable() != null) {
+            offsetTable.putAll(offsetSerializeWrapper.getOffsetTable());
+
+            for (MessageQueue mq : offsetSerializeWrapper.getOffsetTable().keySet()) {
+                AtomicLong offset = offsetSerializeWrapper.getOffsetTable().get(mq);
+                log.info("load consumer's offset, {} {} {}",
+                        this.groupName,
+                        mq,
+                        offset.get());
+            }
+        }
+    }
+
+
+    @Override
+    public void updateOffset(MessageQueue mq, long offset, boolean increaseOnly) {
+        if (mq != null) {
+            AtomicLong offsetOld = this.offsetTable.get(mq);
+            if (null == offsetOld) {
+                offsetOld = this.offsetTable.putIfAbsent(mq, new AtomicLong(offset));
+            }
+
+            if (null != offsetOld) {
+                if (increaseOnly) {
+                    MixAll.compareAndIncreaseOnly(offsetOld, offset);
+                } else {
+                    offsetOld.set(offset);
+                }
+            }
+        }
+    }
+
+
+    @Override
+    public long readOffset(final MessageQueue mq, final ReadOffsetType type) {
+        if (mq != null) {
+            switch (type) {
+                case MEMORY_FIRST_THEN_STORE:
+                case READ_FROM_MEMORY: {
+                    AtomicLong offset = this.offsetTable.get(mq);
+                    if (offset != null) {
+                        return offset.get();
+                    } else if (ReadOffsetType.READ_FROM_MEMORY == type) {
+                        return -1;
+                    }
+                }
+                case READ_FROM_STORE: {
+                    OffsetSerializeWrapper offsetSerializeWrapper;
+                    try {
+                        offsetSerializeWrapper = this.readLocalOffset();
+                    } catch (MQClientException e) {
+                        return -1;
+                    }
+                    if (offsetSerializeWrapper != null && offsetSerializeWrapper.getOffsetTable() != null) {
+                        AtomicLong offset = offsetSerializeWrapper.getOffsetTable().get(mq);
+                        if (offset != null) {
+                            this.updateOffset(mq, offset.get(), false);
+                            return offset.get();
+                        }
+                    }
+                }
+                default:
+                    break;
+            }
+        }
+
+        return -1;
+    }
+
+
+    @Override
+    public void persistAll(Set<MessageQueue> mqs) {
+        if (null == mqs || mqs.isEmpty())
+            return;
+
+        OffsetSerializeWrapper offsetSerializeWrapper = new OffsetSerializeWrapper();
+        for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) {
+            if (mqs.contains(entry.getKey())) {
+                AtomicLong offset = entry.getValue();
+                offsetSerializeWrapper.getOffsetTable().put(entry.getKey(), offset);
+            }
+        }
+
+        String jsonString = offsetSerializeWrapper.toJson(true);
+        if (jsonString != null) {
+            try {
+                MixAll.string2File(jsonString, this.storePath);
+            } catch (IOException e) {
+                log.error("persistAll consumer offset Exception, " + this.storePath, e);
+            }
+        }
+    }
+
+
+    @Override
+    public void persist(MessageQueue mq) {
+    }
+
+    @Override
+    public void removeOffset(MessageQueue mq) {
+
+    }
+
+    @Override
+    public void updateConsumeOffsetToBroker(final MessageQueue mq, final long offset, final boolean isOneway)
+            throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+
+    }
+
+    @Override
+    public Map<MessageQueue, Long> cloneOffsetTable(String topic) {
+        Map<MessageQueue, Long> cloneOffsetTable = new HashMap<MessageQueue, Long>();
+        for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) {
+            MessageQueue mq = entry.getKey();
+            if (!UtilAll.isBlank(topic) && !topic.equals(mq.getTopic())) {
+                continue;
+            }
+            cloneOffsetTable.put(mq, entry.getValue().get());
+
+        }
+        return cloneOffsetTable;
+    }
+
+    private OffsetSerializeWrapper readLocalOffset() throws MQClientException {
+        String content = MixAll.file2String(this.storePath);
+        if (null == content || content.length() == 0) {
+            return this.readLocalOffsetBak();
+        } else {
+            OffsetSerializeWrapper offsetSerializeWrapper = null;
+            try {
+                offsetSerializeWrapper =
+                        OffsetSerializeWrapper.fromJson(content, OffsetSerializeWrapper.class);
+            } catch (Exception e) {
+                log.warn("readLocalOffset Exception, and try to correct", e);
+                return this.readLocalOffsetBak();
+            }
+
+            return offsetSerializeWrapper;
+        }
+    }
+
+    private OffsetSerializeWrapper readLocalOffsetBak() throws MQClientException {
+        String content = MixAll.file2String(this.storePath + ".bak");
+        if (content != null && content.length() > 0) {
+            OffsetSerializeWrapper offsetSerializeWrapper = null;
+            try {
+                offsetSerializeWrapper =
+                        OffsetSerializeWrapper.fromJson(content, OffsetSerializeWrapper.class);
+            } catch (Exception e) {
+                log.warn("readLocalOffset Exception", e);
+                throw new MQClientException("readLocalOffset Exception, maybe fastjson version too low" //
+                        + FAQUrl.suggestTodo(FAQUrl.LOAD_JSON_EXCEPTION), //
+                        e);
+            }
+            return offsetSerializeWrapper;
+        }
+
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/consumer/store/OffsetSerializeWrapper.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/store/OffsetSerializeWrapper.java b/client/src/main/java/org/apache/rocketmq/client/consumer/store/OffsetSerializeWrapper.java
new file mode 100644
index 0000000..e69ad23
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/store/OffsetSerializeWrapper.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.client.consumer.store;
+
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+
+/**
+ * Wrapper class for offset serialization
+ *
+ * @author shijia.wxr
+ */
+public class OffsetSerializeWrapper extends RemotingSerializable {
+    private ConcurrentHashMap<MessageQueue, AtomicLong> offsetTable =
+            new ConcurrentHashMap<MessageQueue, AtomicLong>();
+
+    public ConcurrentHashMap<MessageQueue, AtomicLong> getOffsetTable() {
+        return offsetTable;
+    }
+
+    public void setOffsetTable(ConcurrentHashMap<MessageQueue, AtomicLong> offsetTable) {
+        this.offsetTable = offsetTable;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/consumer/store/OffsetStore.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/store/OffsetStore.java b/client/src/main/java/org/apache/rocketmq/client/consumer/store/OffsetStore.java
new file mode 100644
index 0000000..7c7ccc6
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/store/OffsetStore.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.client.consumer.store;
+
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+
+import java.util.Map;
+import java.util.Set;
+
+
+/**
+ * Offset store interface
+ *
+ * @author shijia.wxr
+ */
+public interface OffsetStore {
+    /**
+     * Load
+     *
+     * @throws MQClientException
+     */
+    void load() throws MQClientException;
+
+
+    /**
+     * Update the offset,store it in memory
+     *
+     * @param mq
+     * @param offset
+     * @param increaseOnly
+     */
+    void updateOffset(final MessageQueue mq, final long offset, final boolean increaseOnly);
+
+    /**
+     * Get offset from local storage
+     *
+     * @param mq
+     * @param type
+     *
+     * @return The fetched offset
+     */
+    long readOffset(final MessageQueue mq, final ReadOffsetType type);
+
+    /**
+     * Persist all offsets,may be in local storage or remote name server
+     *
+     * @param mqs
+     */
+    void persistAll(final Set<MessageQueue> mqs);
+
+    /**
+     * Persist the offset,may be in local storage or remote name server
+     *
+     * @param mq
+     */
+    void persist(final MessageQueue mq);
+
+    /**
+     * Remove offset
+     *
+     * @param mq
+     */
+    void removeOffset(MessageQueue mq);
+
+    /**
+     * @param topic
+     *
+     * @return The cloned offset table of given topic
+     */
+    Map<MessageQueue, Long> cloneOffsetTable(String topic);
+
+    /**
+     *
+     * @param mq
+     * @param offset
+     * @param isOneway
+     */
+    void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway)  throws RemotingException,
+            MQBrokerException, InterruptedException, MQClientException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/consumer/store/ReadOffsetType.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/store/ReadOffsetType.java b/client/src/main/java/org/apache/rocketmq/client/consumer/store/ReadOffsetType.java
new file mode 100644
index 0000000..c2ee9b7
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/store/ReadOffsetType.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.client.consumer.store;
+
+public enum ReadOffsetType {
+    /**
+     * From memory
+     */
+    READ_FROM_MEMORY,
+    /**
+     * From storage
+     */
+    READ_FROM_STORE,
+    /**
+     * From memory,then from storage
+     */
+    MEMORY_FIRST_THEN_STORE;
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java b/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java
new file mode 100644
index 0000000..082e7e8
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java
@@ -0,0 +1,261 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.consumer.store;
+
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.impl.FindBrokerResult;
+import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.client.log.ClientLogger;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetRequestHeader;
+import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.slf4j.Logger;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+
+/**
+ * Remote storage implementation
+ *
+ * @author shijia.wxr
+ */
+public class RemoteBrokerOffsetStore implements OffsetStore {
+    private final static Logger log = ClientLogger.getLog();
+    private final MQClientInstance mQClientFactory;
+    private final String groupName;
+    private ConcurrentHashMap<MessageQueue, AtomicLong> offsetTable =
+            new ConcurrentHashMap<MessageQueue, AtomicLong>();
+
+
+    public RemoteBrokerOffsetStore(MQClientInstance mQClientFactory, String groupName) {
+        this.mQClientFactory = mQClientFactory;
+        this.groupName = groupName;
+    }
+
+
+    @Override
+    public void load() {
+    }
+
+
+    @Override
+    public void updateOffset(MessageQueue mq, long offset, boolean increaseOnly) {
+        if (mq != null) {
+            AtomicLong offsetOld = this.offsetTable.get(mq);
+            if (null == offsetOld) {
+                offsetOld = this.offsetTable.putIfAbsent(mq, new AtomicLong(offset));
+            }
+
+            if (null != offsetOld) {
+                if (increaseOnly) {
+                    MixAll.compareAndIncreaseOnly(offsetOld, offset);
+                } else {
+                    offsetOld.set(offset);
+                }
+            }
+        }
+    }
+
+
+    @Override
+    public long readOffset(final MessageQueue mq, final ReadOffsetType type) {
+        if (mq != null) {
+            switch (type) {
+                case MEMORY_FIRST_THEN_STORE:
+                case READ_FROM_MEMORY: {
+                    AtomicLong offset = this.offsetTable.get(mq);
+                    if (offset != null) {
+                        return offset.get();
+                    } else if (ReadOffsetType.READ_FROM_MEMORY == type) {
+                        return -1;
+                    }
+                }
+                case READ_FROM_STORE: {
+                    try {
+                        long brokerOffset = this.fetchConsumeOffsetFromBroker(mq);
+                        AtomicLong offset = new AtomicLong(brokerOffset);
+                        this.updateOffset(mq, offset.get(), false);
+                        return brokerOffset;
+                    }
+                    // No offset in broker
+                    catch (MQBrokerException e) {
+                        return -1;
+                    }
+                    //Other exceptions
+                    catch (Exception e) {
+                        log.warn("fetchConsumeOffsetFromBroker exception, " + mq, e);
+                        return -2;
+                    }
+                }
+                default:
+                    break;
+            }
+        }
+
+        return -1;
+    }
+
+
+    @Override
+    public void persistAll(Set<MessageQueue> mqs) {
+        if (null == mqs || mqs.isEmpty())
+            return;
+
+        final HashSet<MessageQueue> unusedMQ = new HashSet<MessageQueue>();
+        if (mqs != null && !mqs.isEmpty()) {
+            for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) {
+                MessageQueue mq = entry.getKey();
+                AtomicLong offset = entry.getValue();
+                if (offset != null) {
+                    if (mqs.contains(mq)) {
+                        try {
+                            this.updateConsumeOffsetToBroker(mq, offset.get());
+                            log.info("[persistAll] Group: {} ClientId: {} updateConsumeOffsetToBroker {} {}",
+                                    this.groupName,
+                                    this.mQClientFactory.getClientId(),
+                                    mq,
+                                    offset.get());
+                        } catch (Exception e) {
+                            log.error("updateConsumeOffsetToBroker exception, " + mq.toString(), e);
+                        }
+                    } else {
+                        unusedMQ.add(mq);
+                    }
+                }
+            }
+        }
+
+        if (!unusedMQ.isEmpty()) {
+            for (MessageQueue mq : unusedMQ) {
+                this.offsetTable.remove(mq);
+                log.info("remove unused mq, {}, {}", mq, this.groupName);
+            }
+        }
+    }
+
+
+    @Override
+    public void persist(MessageQueue mq) {
+        AtomicLong offset = this.offsetTable.get(mq);
+        if (offset != null) {
+            try {
+                this.updateConsumeOffsetToBroker(mq, offset.get());
+                log.info("[persist] Group: {} ClientId: {} updateConsumeOffsetToBroker {} {}",
+                        this.groupName,
+                        this.mQClientFactory.getClientId(),
+                        mq,
+                        offset.get());
+            } catch (Exception e) {
+                log.error("updateConsumeOffsetToBroker exception, " + mq.toString(), e);
+            }
+        }
+    }
+
+    public void removeOffset(MessageQueue mq) {
+        if (mq != null) {
+            this.offsetTable.remove(mq);
+            log.info("remove unnecessary messageQueue offset. group={}, mq={}, offsetTableSize={}", this.groupName, mq,
+                    offsetTable.size());
+        }
+    }
+
+    @Override
+    public Map<MessageQueue, Long> cloneOffsetTable(String topic) {
+        Map<MessageQueue, Long> cloneOffsetTable = new HashMap<MessageQueue, Long>();
+        for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) {
+            MessageQueue mq = entry.getKey();
+            if (!UtilAll.isBlank(topic) && !topic.equals(mq.getTopic())) {
+                continue;
+            }
+            cloneOffsetTable.put(mq, entry.getValue().get());
+        }
+        return cloneOffsetTable;
+    }
+
+    /**
+     * Update the Consumer Offset in one way, once the Master is off, updated to Slave,
+     * here need to be optimized.
+     */
+    private void updateConsumeOffsetToBroker(MessageQueue mq, long offset) throws RemotingException,
+            MQBrokerException, InterruptedException, MQClientException {
+        updateConsumeOffsetToBroker(mq, offset, true);
+    }
+
+    /**
+     * Update the Consumer Offset synchronously, once the Master is off, updated to Slave,
+     * here need to be optimized.
+     */
+    @Override
+    public void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException,
+            MQBrokerException, InterruptedException, MQClientException {
+        FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
+        if (null == findBrokerResult) {
+            // TODO Here may be heavily overhead for Name Server,need tuning
+            this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
+            findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
+        }
+
+        if (findBrokerResult != null) {
+            UpdateConsumerOffsetRequestHeader requestHeader = new UpdateConsumerOffsetRequestHeader();
+            requestHeader.setTopic(mq.getTopic());
+            requestHeader.setConsumerGroup(this.groupName);
+            requestHeader.setQueueId(mq.getQueueId());
+            requestHeader.setCommitOffset(offset);
+
+            if (isOneway) {
+                this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffsetOneway(
+                        findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
+            } else {
+                this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffset(
+                        findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
+            }
+        } else {
+            throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
+        }
+    }
+
+    private long fetchConsumeOffsetFromBroker(MessageQueue mq) throws RemotingException, MQBrokerException,
+            InterruptedException, MQClientException {
+        FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
+        if (null == findBrokerResult) {
+            // TODO Here may be heavily overhead for Name Server,need tuning
+            this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
+            findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
+        }
+
+        if (findBrokerResult != null) {
+            QueryConsumerOffsetRequestHeader requestHeader = new QueryConsumerOffsetRequestHeader();
+            requestHeader.setTopic(mq.getTopic());
+            requestHeader.setConsumerGroup(this.groupName);
+            requestHeader.setQueueId(mq.getQueueId());
+
+            return this.mQClientFactory.getMQClientAPIImpl().queryConsumerOffset(
+                    findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
+        } else {
+            throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/exception/MQBrokerException.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/exception/MQBrokerException.java b/client/src/main/java/org/apache/rocketmq/client/exception/MQBrokerException.java
new file mode 100644
index 0000000..5e8d1b9
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/exception/MQBrokerException.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.client.exception;
+
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.help.FAQUrl;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class MQBrokerException extends Exception {
+    private static final long serialVersionUID = 5975020272601250368L;
+    private final int responseCode;
+    private final String errorMessage;
+
+
+    public MQBrokerException(int responseCode, String errorMessage) {
+        super(FAQUrl.attachDefaultURL("CODE: " + UtilAll.responseCode2String(responseCode) + "  DESC: "
+                + errorMessage));
+        this.responseCode = responseCode;
+        this.errorMessage = errorMessage;
+    }
+
+
+    public int getResponseCode() {
+        return responseCode;
+    }
+
+
+    public String getErrorMessage() {
+        return errorMessage;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/exception/MQClientException.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/exception/MQClientException.java b/client/src/main/java/org/apache/rocketmq/client/exception/MQClientException.java
new file mode 100644
index 0000000..5f32d12
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/exception/MQClientException.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.client.exception;
+
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.help.FAQUrl;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class MQClientException extends Exception {
+    private static final long serialVersionUID = -5758410930844185841L;
+    private int responseCode;
+    private String errorMessage;
+
+
+    public MQClientException(String errorMessage, Throwable cause) {
+        super(FAQUrl.attachDefaultURL(errorMessage), cause);
+        this.responseCode = -1;
+        this.errorMessage = errorMessage;
+    }
+
+
+    public MQClientException(int responseCode, String errorMessage) {
+        super(FAQUrl.attachDefaultURL("CODE: " + UtilAll.responseCode2String(responseCode) + "  DESC: "
+                + errorMessage));
+        this.responseCode = responseCode;
+        this.errorMessage = errorMessage;
+    }
+
+    public int getResponseCode() {
+        return responseCode;
+    }
+
+    public MQClientException setResponseCode(final int responseCode) {
+        this.responseCode = responseCode;
+        return this;
+    }
+
+    public String getErrorMessage() {
+        return errorMessage;
+    }
+
+    public void setErrorMessage(final String errorMessage) {
+        this.errorMessage = errorMessage;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/hook/CheckForbiddenContext.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/hook/CheckForbiddenContext.java b/client/src/main/java/org/apache/rocketmq/client/hook/CheckForbiddenContext.java
new file mode 100644
index 0000000..8cb4ca9
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/hook/CheckForbiddenContext.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.client.hook;
+
+import org.apache.rocketmq.client.impl.CommunicationMode;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageQueue;
+
+
+/**
+ * @author manhong.yqd
+ */
+public class CheckForbiddenContext {
+    private String nameSrvAddr;
+    private String group;
+    private Message message;
+    private MessageQueue mq;
+    private String brokerAddr;
+    private CommunicationMode communicationMode;
+    private SendResult sendResult;
+    private Exception exception;
+    private Object arg;
+    private boolean unitMode = false;
+
+
+    public String getGroup() {
+        return group;
+    }
+
+
+    public void setGroup(String group) {
+        this.group = group;
+    }
+
+
+    public Message getMessage() {
+        return message;
+    }
+
+
+    public void setMessage(Message message) {
+        this.message = message;
+    }
+
+
+    public MessageQueue getMq() {
+        return mq;
+    }
+
+
+    public void setMq(MessageQueue mq) {
+        this.mq = mq;
+    }
+
+
+    public String getBrokerAddr() {
+        return brokerAddr;
+    }
+
+
+    public void setBrokerAddr(String brokerAddr) {
+        this.brokerAddr = brokerAddr;
+    }
+
+
+    public CommunicationMode getCommunicationMode() {
+        return communicationMode;
+    }
+
+
+    public void setCommunicationMode(CommunicationMode communicationMode) {
+        this.communicationMode = communicationMode;
+    }
+
+
+    public SendResult getSendResult() {
+        return sendResult;
+    }
+
+
+    public void setSendResult(SendResult sendResult) {
+        this.sendResult = sendResult;
+    }
+
+
+    public Exception getException() {
+        return exception;
+    }
+
+
+    public void setException(Exception exception) {
+        this.exception = exception;
+    }
+
+
+    public Object getArg() {
+        return arg;
+    }
+
+
+    public void setArg(Object arg) {
+        this.arg = arg;
+    }
+
+
+    public boolean isUnitMode() {
+        return unitMode;
+    }
+
+
+    public void setUnitMode(boolean isUnitMode) {
+        this.unitMode = isUnitMode;
+    }
+
+
+    public String getNameSrvAddr() {
+        return nameSrvAddr;
+    }
+
+
+    public void setNameSrvAddr(String nameSrvAddr) {
+        this.nameSrvAddr = nameSrvAddr;
+    }
+
+
+    @Override
+    public String toString() {
+        return "SendMessageContext [nameSrvAddr=" + nameSrvAddr + ", group=" + group + ", message=" + message
+                + ", mq=" + mq + ", brokerAddr=" + brokerAddr + ", communicationMode=" + communicationMode
+                + ", sendResult=" + sendResult + ", exception=" + exception + ", unitMode=" + unitMode
+                + ", arg=" + arg + "]";
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/hook/CheckForbiddenHook.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/hook/CheckForbiddenHook.java b/client/src/main/java/org/apache/rocketmq/client/hook/CheckForbiddenHook.java
new file mode 100644
index 0000000..41ed088
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/hook/CheckForbiddenHook.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.client.hook;
+
+import org.apache.rocketmq.client.exception.MQClientException;
+
+
+/**
+ * @author manhong.yqd
+ */
+public interface CheckForbiddenHook {
+    public String hookName();
+
+
+    public void checkForbidden(final CheckForbiddenContext context) throws MQClientException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/hook/ConsumeMessageContext.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/hook/ConsumeMessageContext.java b/client/src/main/java/org/apache/rocketmq/client/hook/ConsumeMessageContext.java
new file mode 100644
index 0000000..f141fac
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/hook/ConsumeMessageContext.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.client.hook;
+
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+
+import java.util.List;
+import java.util.Map;
+
+
+public class ConsumeMessageContext {
+    private String consumerGroup;
+    private List<MessageExt> msgList;
+    private MessageQueue mq;
+    private boolean success;
+    private String status;
+    private Object mqTraceContext;
+    private Map<String, String> props;
+
+
+    public String getConsumerGroup() {
+        return consumerGroup;
+    }
+
+
+    public void setConsumerGroup(String consumerGroup) {
+        this.consumerGroup = consumerGroup;
+    }
+
+
+    public List<MessageExt> getMsgList() {
+        return msgList;
+    }
+
+
+    public void setMsgList(List<MessageExt> msgList) {
+        this.msgList = msgList;
+    }
+
+
+    public MessageQueue getMq() {
+        return mq;
+    }
+
+
+    public void setMq(MessageQueue mq) {
+        this.mq = mq;
+    }
+
+
+    public boolean isSuccess() {
+        return success;
+    }
+
+
+    public void setSuccess(boolean success) {
+        this.success = success;
+    }
+
+
+    public Object getMqTraceContext() {
+        return mqTraceContext;
+    }
+
+
+    public void setMqTraceContext(Object mqTraceContext) {
+        this.mqTraceContext = mqTraceContext;
+    }
+
+
+    public Map<String, String> getProps() {
+        return props;
+    }
+
+
+    public void setProps(Map<String, String> props) {
+        this.props = props;
+    }
+
+
+    public String getStatus() {
+        return status;
+    }
+
+
+    public void setStatus(String status) {
+        this.status = status;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/hook/ConsumeMessageHook.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/hook/ConsumeMessageHook.java b/client/src/main/java/org/apache/rocketmq/client/hook/ConsumeMessageHook.java
new file mode 100644
index 0000000..8161d2e
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/hook/ConsumeMessageHook.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.client.hook;
+
+public interface ConsumeMessageHook {
+    String hookName();
+
+    void consumeMessageBefore(final ConsumeMessageContext context);
+
+    void consumeMessageAfter(final ConsumeMessageContext context);
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/hook/FilterMessageContext.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/hook/FilterMessageContext.java b/client/src/main/java/org/apache/rocketmq/client/hook/FilterMessageContext.java
new file mode 100644
index 0000000..942fd71
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/hook/FilterMessageContext.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.client.hook;
+
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+
+import java.util.List;
+
+
+/**
+ * @author manhong.yqd
+ */
+public class FilterMessageContext {
+    private String consumerGroup;
+    private List<MessageExt> msgList;
+    private MessageQueue mq;
+    private Object arg;
+    private boolean unitMode;
+
+
+    public String getConsumerGroup() {
+        return consumerGroup;
+    }
+
+
+    public void setConsumerGroup(String consumerGroup) {
+        this.consumerGroup = consumerGroup;
+    }
+
+
+    public List<MessageExt> getMsgList() {
+        return msgList;
+    }
+
+
+    public void setMsgList(List<MessageExt> msgList) {
+        this.msgList = msgList;
+    }
+
+
+    public MessageQueue getMq() {
+        return mq;
+    }
+
+
+    public void setMq(MessageQueue mq) {
+        this.mq = mq;
+    }
+
+
+    public Object getArg() {
+        return arg;
+    }
+
+
+    public void setArg(Object arg) {
+        this.arg = arg;
+    }
+
+
+    public boolean isUnitMode() {
+        return unitMode;
+    }
+
+
+    public void setUnitMode(boolean isUnitMode) {
+        this.unitMode = isUnitMode;
+    }
+
+
+    @Override
+    public String toString() {
+        return "ConsumeMessageContext [consumerGroup=" + consumerGroup + ", msgList=" + msgList + ", mq="
+                + mq + ", arg=" + arg + "]";
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/hook/FilterMessageHook.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/hook/FilterMessageHook.java b/client/src/main/java/org/apache/rocketmq/client/hook/FilterMessageHook.java
new file mode 100644
index 0000000..016ff56
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/hook/FilterMessageHook.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.client.hook;
+
+/**
+ * @author manhong.yqd
+ */
+public interface FilterMessageHook {
+    public String hookName();
+
+
+    public void filterMessage(final FilterMessageContext context);
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/hook/SendMessageContext.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/hook/SendMessageContext.java b/client/src/main/java/org/apache/rocketmq/client/hook/SendMessageContext.java
new file mode 100644
index 0000000..bfb4a47
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/hook/SendMessageContext.java
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.client.hook;
+
+import org.apache.rocketmq.client.impl.CommunicationMode;
+import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.message.MessageType;
+
+import java.util.Map;
+
+
+public class SendMessageContext {
+    private String producerGroup;
+    private Message message;
+    private MessageQueue mq;
+    private String brokerAddr;
+    private String bornHost;
+    private CommunicationMode communicationMode;
+    private SendResult sendResult;
+    private Exception exception;
+    private Object mqTraceContext;
+    private Map<String, String> props;
+    private DefaultMQProducerImpl producer;
+    private MessageType msgType = MessageType.Normal_Msg;
+
+    public MessageType getMsgType() {
+        return msgType;
+    }
+
+    public void setMsgType(final MessageType msgType) {
+        this.msgType = msgType;
+    }
+
+    public DefaultMQProducerImpl getProducer() {
+        return producer;
+    }
+
+    public void setProducer(final DefaultMQProducerImpl producer) {
+        this.producer = producer;
+    }
+
+    public String getProducerGroup() {
+        return producerGroup;
+    }
+
+
+    public void setProducerGroup(String producerGroup) {
+        this.producerGroup = producerGroup;
+    }
+
+
+    public Message getMessage() {
+        return message;
+    }
+
+
+    public void setMessage(Message message) {
+        this.message = message;
+    }
+
+
+    public MessageQueue getMq() {
+        return mq;
+    }
+
+
+    public void setMq(MessageQueue mq) {
+        this.mq = mq;
+    }
+
+
+    public String getBrokerAddr() {
+        return brokerAddr;
+    }
+
+
+    public void setBrokerAddr(String brokerAddr) {
+        this.brokerAddr = brokerAddr;
+    }
+
+
+    public CommunicationMode getCommunicationMode() {
+        return communicationMode;
+    }
+
+
+    public void setCommunicationMode(CommunicationMode communicationMode) {
+        this.communicationMode = communicationMode;
+    }
+
+
+    public SendResult getSendResult() {
+        return sendResult;
+    }
+
+
+    public void setSendResult(SendResult sendResult) {
+        this.sendResult = sendResult;
+    }
+
+
+    public Exception getException() {
+        return exception;
+    }
+
+
+    public void setException(Exception exception) {
+        this.exception = exception;
+    }
+
+
+    public Object getMqTraceContext() {
+        return mqTraceContext;
+    }
+
+
+    public void setMqTraceContext(Object mqTraceContext) {
+        this.mqTraceContext = mqTraceContext;
+    }
+
+
+    public Map<String, String> getProps() {
+        return props;
+    }
+
+
+    public void setProps(Map<String, String> props) {
+        this.props = props;
+    }
+
+
+    public String getBornHost() {
+        return bornHost;
+    }
+
+
+    public void setBornHost(String bornHost) {
+        this.bornHost = bornHost;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/hook/SendMessageHook.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/hook/SendMessageHook.java b/client/src/main/java/org/apache/rocketmq/client/hook/SendMessageHook.java
new file mode 100644
index 0000000..c040831
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/hook/SendMessageHook.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.client.hook;
+
+public interface SendMessageHook {
+    String hookName();
+
+    void sendMessageBefore(final SendMessageContext context);
+
+    void sendMessageAfter(final SendMessageContext context);
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java b/client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java
new file mode 100644
index 0000000..50e9b45
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.client.impl;
+
+import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.client.impl.producer.MQProducerInner;
+import org.apache.rocketmq.client.log.ClientLogger;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
+import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
+import org.apache.rocketmq.common.protocol.body.GetConsumerStatusBody;
+import org.apache.rocketmq.common.protocol.body.ResetOffsetBody;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import io.netty.channel.ChannelHandlerContext;
+import org.apache.rocketmq.common.protocol.header.*;
+import org.slf4j.Logger;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class ClientRemotingProcessor implements NettyRequestProcessor {
+    private final Logger log = ClientLogger.getLog();
+    private final MQClientInstance mqClientFactory;
+
+
+    public ClientRemotingProcessor(final MQClientInstance mqClientFactory) {
+        this.mqClientFactory = mqClientFactory;
+    }
+
+
+    @Override
+    public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
+        switch (request.getCode()) {
+            case RequestCode.CHECK_TRANSACTION_STATE:
+                return this.checkTransactionState(ctx, request);
+            case RequestCode.NOTIFY_CONSUMER_IDS_CHANGED:
+                return this.notifyConsumerIdsChanged(ctx, request);
+            case RequestCode.RESET_CONSUMER_CLIENT_OFFSET:
+                return this.resetOffset(ctx, request);
+            case RequestCode.GET_CONSUMER_STATUS_FROM_CLIENT:
+                return this.getConsumeStatus(ctx, request);
+
+            case RequestCode.GET_CONSUMER_RUNNING_INFO:
+                return this.getConsumerRunningInfo(ctx, request);
+
+            case RequestCode.CONSUME_MESSAGE_DIRECTLY:
+                return this.consumeMessageDirectly(ctx, request);
+            default:
+                break;
+        }
+        return null;
+    }
+
+    @Override
+    public boolean rejectRequest() {
+        return false;
+    }
+
+    public RemotingCommand checkTransactionState(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
+        final CheckTransactionStateRequestHeader requestHeader =
+                (CheckTransactionStateRequestHeader) request.decodeCommandCustomHeader(CheckTransactionStateRequestHeader.class);
+        final ByteBuffer byteBuffer = ByteBuffer.wrap(request.getBody());
+        final MessageExt messageExt = MessageDecoder.decode(byteBuffer);
+        if (messageExt != null) {
+            final String group = messageExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);
+            if (group != null) {
+                MQProducerInner producer = this.mqClientFactory.selectProducer(group);
+                if (producer != null) {
+                    final String addr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
+                    producer.checkTransactionState(addr, messageExt, requestHeader);
+                } else {
+                    log.debug("checkTransactionState, pick producer by group[{}] failed", group);
+                }
+            } else {
+                log.warn("checkTransactionState, pick producer group failed");
+            }
+        } else {
+            log.warn("checkTransactionState, decode message failed");
+        }
+
+        return null;
+    }
+
+    public RemotingCommand notifyConsumerIdsChanged(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
+        try {
+            final NotifyConsumerIdsChangedRequestHeader requestHeader =
+                    (NotifyConsumerIdsChangedRequestHeader) request.decodeCommandCustomHeader(NotifyConsumerIdsChangedRequestHeader.class);
+            log.info("receive broker's notification[{}], the consumer group: {} changed, rebalance immediately",
+                    RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
+                    requestHeader.getConsumerGroup());
+            this.mqClientFactory.rebalanceImmediately();
+        } catch (Exception e) {
+            log.error("notifyConsumerIdsChanged exception", RemotingHelper.exceptionSimpleDesc(e));
+        }
+        return null;
+    }
+
+    public RemotingCommand resetOffset(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
+        final ResetOffsetRequestHeader requestHeader =
+                (ResetOffsetRequestHeader) request.decodeCommandCustomHeader(ResetOffsetRequestHeader.class);
+        log.info("invoke reset offset operation from broker. brokerAddr={}, topic={}, group={}, timestamp={}",
+                new Object[]{RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.getTopic(), requestHeader.getGroup(),
+                        requestHeader.getTimestamp()});
+        Map<MessageQueue, Long> offsetTable = new HashMap<MessageQueue, Long>();
+        if (request.getBody() != null) {
+            ResetOffsetBody body = ResetOffsetBody.decode(request.getBody(), ResetOffsetBody.class);
+            offsetTable = body.getOffsetTable();
+        }
+        this.mqClientFactory.resetOffset(requestHeader.getTopic(), requestHeader.getGroup(), offsetTable);
+        return null;
+    }
+
+    @Deprecated
+    public RemotingCommand getConsumeStatus(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
+        final RemotingCommand response = RemotingCommand.createResponseCommand(null);
+        final GetConsumerStatusRequestHeader requestHeader =
+                (GetConsumerStatusRequestHeader) request.decodeCommandCustomHeader(GetConsumerStatusRequestHeader.class);
+
+        Map<MessageQueue, Long> offsetTable = this.mqClientFactory.getConsumerStatus(requestHeader.getTopic(), requestHeader.getGroup());
+        GetConsumerStatusBody body = new GetConsumerStatusBody();
+        body.setMessageQueueTable(offsetTable);
+        response.setBody(body.encode());
+        response.setCode(ResponseCode.SUCCESS);
+        return response;
+    }
+
+    private RemotingCommand getConsumerRunningInfo(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
+        final RemotingCommand response = RemotingCommand.createResponseCommand(null);
+        final GetConsumerRunningInfoRequestHeader requestHeader =
+                (GetConsumerRunningInfoRequestHeader) request.decodeCommandCustomHeader(GetConsumerRunningInfoRequestHeader.class);
+
+        ConsumerRunningInfo consumerRunningInfo = this.mqClientFactory.consumerRunningInfo(requestHeader.getConsumerGroup());
+        if (null != consumerRunningInfo) {
+            if (requestHeader.isJstackEnable()) {
+                Map<Thread, StackTraceElement[]> map = Thread.getAllStackTraces();
+                String jstack = UtilAll.jstack(map);
+                consumerRunningInfo.setJstack(jstack);
+            }
+
+            response.setCode(ResponseCode.SUCCESS);
+            response.setBody(consumerRunningInfo.encode());
+        } else {
+            response.setCode(ResponseCode.SYSTEM_ERROR);
+            response.setRemark(String.format("The Consumer Group <%s> not exist in this consumer", requestHeader.getConsumerGroup()));
+        }
+
+        return response;
+    }
+
+    private RemotingCommand consumeMessageDirectly(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
+        final RemotingCommand response = RemotingCommand.createResponseCommand(null);
+        final ConsumeMessageDirectlyResultRequestHeader requestHeader =
+                (ConsumeMessageDirectlyResultRequestHeader) request
+                        .decodeCommandCustomHeader(ConsumeMessageDirectlyResultRequestHeader.class);
+
+        final MessageExt msg = MessageDecoder.decode(ByteBuffer.wrap(request.getBody()));
+
+        ConsumeMessageDirectlyResult result =
+                this.mqClientFactory.consumeMessageDirectly(msg, requestHeader.getConsumerGroup(), requestHeader.getBrokerName());
+
+        if (null != result) {
+            response.setCode(ResponseCode.SUCCESS);
+            response.setBody(result.encode());
+        } else {
+            response.setCode(ResponseCode.SYSTEM_ERROR);
+            response.setRemark(String.format("The Consumer Group <%s> not exist in this consumer", requestHeader.getConsumerGroup()));
+        }
+
+        return response;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/impl/CommunicationMode.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/CommunicationMode.java b/client/src/main/java/org/apache/rocketmq/client/impl/CommunicationMode.java
new file mode 100644
index 0000000..0f57339
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/CommunicationMode.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.client.impl;
+
+/**
+ * @author shijia.wxr
+ */
+public enum CommunicationMode {
+    SYNC,
+    ASYNC,
+    ONEWAY,
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/impl/FindBrokerResult.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/FindBrokerResult.java b/client/src/main/java/org/apache/rocketmq/client/impl/FindBrokerResult.java
new file mode 100644
index 0000000..56528ef
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/FindBrokerResult.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.client.impl;
+
+/**
+ * @author shijia.wxr
+ */
+public class FindBrokerResult {
+    private final String brokerAddr;
+    private final boolean slave;
+
+
+    public FindBrokerResult(String brokerAddr, boolean slave) {
+        this.brokerAddr = brokerAddr;
+        this.slave = slave;
+    }
+
+
+    public String getBrokerAddr() {
+        return brokerAddr;
+    }
+
+
+    public boolean isSlave() {
+        return slave;
+    }
+}