You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2016/12/19 09:40:39 UTC

[22/43] incubator-rocketmq git commit: Finish code dump. Reviewed by: @yukon @vongosling @stevenschew @vintagewang @lollipop @zander

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ConsumerConnection.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ConsumerConnection.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ConsumerConnection.java
new file mode 100644
index 0000000..f74c6fc
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ConsumerConnection.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package com.alibaba.rocketmq.common.protocol.body;
+
+import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
+import com.alibaba.rocketmq.common.protocol.heartbeat.ConsumeType;
+import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;
+import com.alibaba.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable;
+
+import java.util.HashSet;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class ConsumerConnection extends RemotingSerializable {
+    private HashSet<Connection> connectionSet = new HashSet<Connection>();
+    private ConcurrentHashMap<String/* Topic */, SubscriptionData> subscriptionTable =
+            new ConcurrentHashMap<String, SubscriptionData>();
+    private ConsumeType consumeType;
+    private MessageModel messageModel;
+    private ConsumeFromWhere consumeFromWhere;
+
+
+    public int computeMinVersion() {
+        int minVersion = Integer.MAX_VALUE;
+        for (Connection c : this.connectionSet) {
+            if (c.getVersion() < minVersion) {
+                minVersion = c.getVersion();
+            }
+        }
+
+        return minVersion;
+    }
+
+
+    public HashSet<Connection> getConnectionSet() {
+        return connectionSet;
+    }
+
+
+    public void setConnectionSet(HashSet<Connection> connectionSet) {
+        this.connectionSet = connectionSet;
+    }
+
+
+    public ConcurrentHashMap<String, SubscriptionData> getSubscriptionTable() {
+        return subscriptionTable;
+    }
+
+
+    public void setSubscriptionTable(ConcurrentHashMap<String, SubscriptionData> subscriptionTable) {
+        this.subscriptionTable = subscriptionTable;
+    }
+
+
+    public ConsumeType getConsumeType() {
+        return consumeType;
+    }
+
+
+    public void setConsumeType(ConsumeType consumeType) {
+        this.consumeType = consumeType;
+    }
+
+
+    public MessageModel getMessageModel() {
+        return messageModel;
+    }
+
+
+    public void setMessageModel(MessageModel messageModel) {
+        this.messageModel = messageModel;
+    }
+
+
+    public ConsumeFromWhere getConsumeFromWhere() {
+        return consumeFromWhere;
+    }
+
+
+    public void setConsumeFromWhere(ConsumeFromWhere consumeFromWhere) {
+        this.consumeFromWhere = consumeFromWhere;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ConsumerOffsetSerializeWrapper.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ConsumerOffsetSerializeWrapper.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ConsumerOffsetSerializeWrapper.java
new file mode 100644
index 0000000..7af4e7a
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ConsumerOffsetSerializeWrapper.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package com.alibaba.rocketmq.common.protocol.body;
+
+import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+
+/**
+ * @author manhong.yqd
+ */
+public class ConsumerOffsetSerializeWrapper extends RemotingSerializable {
+    private ConcurrentHashMap<String/* topic@group */, ConcurrentHashMap<Integer, Long>> offsetTable =
+            new ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>>(512);
+
+
+    public ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>> getOffsetTable() {
+        return offsetTable;
+    }
+
+
+    public void setOffsetTable(ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>> offsetTable) {
+        this.offsetTable = offsetTable;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ConsumerRunningInfo.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ConsumerRunningInfo.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ConsumerRunningInfo.java
new file mode 100644
index 0000000..56babc2
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ConsumerRunningInfo.java
@@ -0,0 +1,315 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.rocketmq.common.protocol.body;
+
+import com.alibaba.rocketmq.common.message.MessageQueue;
+import com.alibaba.rocketmq.common.protocol.heartbeat.ConsumeType;
+import com.alibaba.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable;
+
+import java.util.*;
+import java.util.Map.Entry;
+
+public class ConsumerRunningInfo extends RemotingSerializable {
+    public static final String PROP_NAMESERVER_ADDR = "PROP_NAMESERVER_ADDR";
+    public static final String PROP_THREADPOOL_CORE_SIZE = "PROP_THREADPOOL_CORE_SIZE";
+    public static final String PROP_CONSUME_ORDERLY = "PROP_CONSUMEORDERLY";
+    public static final String PROP_CONSUME_TYPE = "PROP_CONSUME_TYPE";
+    public static final String PROP_CLIENT_VERSION = "PROP_CLIENT_VERSION";
+    public static final String PROP_CONSUMER_START_TIMESTAMP = "PROP_CONSUMER_START_TIMESTAMP";
+
+
+    private Properties properties = new Properties();
+
+    private TreeSet<SubscriptionData> subscriptionSet = new TreeSet<SubscriptionData>();
+
+    private TreeMap<MessageQueue, ProcessQueueInfo> mqTable = new TreeMap<MessageQueue, ProcessQueueInfo>();
+
+    private TreeMap<String/* Topic */, ConsumeStatus> statusTable = new TreeMap<String, ConsumeStatus>();
+
+    private String jstack;
+
+    public static boolean analyzeSubscription(final TreeMap<String/* clientId */, ConsumerRunningInfo> criTable) {
+        ConsumerRunningInfo prev = criTable.firstEntry().getValue();
+
+        boolean push = false;
+        {
+            String property = prev.getProperties().getProperty(ConsumerRunningInfo.PROP_CONSUME_TYPE);
+
+            if (property == null) {
+                property = ((ConsumeType) prev.getProperties().get(ConsumerRunningInfo.PROP_CONSUME_TYPE)).name();
+            }
+            push = ConsumeType.valueOf(property) == ConsumeType.CONSUME_PASSIVELY;
+        }
+
+        boolean startForAWhile = false;
+        {
+
+            String property = prev.getProperties().getProperty(ConsumerRunningInfo.PROP_CONSUMER_START_TIMESTAMP);
+            if (property == null) {
+                property = String.valueOf(prev.getProperties().get(ConsumerRunningInfo.PROP_CONSUMER_START_TIMESTAMP));
+            }
+            startForAWhile = (System.currentTimeMillis() - Long.parseLong(property)) > (1000 * 60 * 2);
+        }
+
+        if (push && startForAWhile) {
+
+            {
+                Iterator<Entry<String, ConsumerRunningInfo>> it = criTable.entrySet().iterator();
+                while (it.hasNext()) {
+                    Entry<String, ConsumerRunningInfo> next = it.next();
+                    ConsumerRunningInfo current = next.getValue();
+                    boolean equals = current.getSubscriptionSet().equals(prev.getSubscriptionSet());
+
+                    if (!equals) {
+                        // Different subscription in the same group of consumer
+                        return false;
+                    }
+
+                    prev = next.getValue();
+                }
+
+                if (prev != null) {
+
+                    if (prev.getSubscriptionSet().isEmpty()) {
+                        // Subscription empty!
+                        return false;
+                    }
+                }
+            }
+        }
+
+        return true;
+    }
+
+    public Properties getProperties() {
+        return properties;
+    }
+
+    public void setProperties(Properties properties) {
+        this.properties = properties;
+    }
+
+    public TreeSet<SubscriptionData> getSubscriptionSet() {
+        return subscriptionSet;
+    }
+
+    public void setSubscriptionSet(TreeSet<SubscriptionData> subscriptionSet) {
+        this.subscriptionSet = subscriptionSet;
+    }
+
+    public static boolean analyzeRebalance(final TreeMap<String/* clientId */, ConsumerRunningInfo> criTable) {
+        return true;
+    }
+
+    public static String analyzeProcessQueue(final String clientId, ConsumerRunningInfo info) {
+        StringBuilder sb = new StringBuilder();
+        boolean push = false;
+        {
+            String property = info.getProperties().getProperty(ConsumerRunningInfo.PROP_CONSUME_TYPE);
+
+            if (property == null) {
+                property = ((ConsumeType) info.getProperties().get(ConsumerRunningInfo.PROP_CONSUME_TYPE)).name();
+            }
+            push = ConsumeType.valueOf(property) == ConsumeType.CONSUME_PASSIVELY;
+        }
+
+        boolean orderMsg = false;
+        {
+            String property = info.getProperties().getProperty(ConsumerRunningInfo.PROP_CONSUME_ORDERLY);
+            orderMsg = Boolean.parseBoolean(property);
+        }
+
+        if (push) {
+            Iterator<Entry<MessageQueue, ProcessQueueInfo>> it = info.getMqTable().entrySet().iterator();
+            while (it.hasNext()) {
+                Entry<MessageQueue, ProcessQueueInfo> next = it.next();
+                MessageQueue mq = next.getKey();
+                ProcessQueueInfo pq = next.getValue();
+
+
+                if (orderMsg) {
+
+                    if (!pq.isLocked()) {
+                        sb.append(String.format("%s %s can't lock for a while, %dms%n", //
+                                clientId, //
+                                mq, //
+                                System.currentTimeMillis() - pq.getLastLockTimestamp()));
+                    } else {
+                        if (pq.isDroped() && (pq.getTryUnlockTimes() > 0)) {
+                            sb.append(String.format("%s %s unlock %d times, still failed%n", //
+                                    clientId, //
+                                    mq, //
+                                    pq.getTryUnlockTimes()));
+                        }
+                    }
+
+
+                } else {
+                    long diff = System.currentTimeMillis() - pq.getLastConsumeTimestamp();
+
+                    if (diff > (1000 * 60) && pq.getCachedMsgCount() > 0) {
+                        sb.append(String.format("%s %s can't consume for a while, maybe blocked, %dms%n", //
+                                clientId, //
+                                mq, //
+                                diff));
+                    }
+                }
+            }
+        }
+
+        return sb.toString();
+    }
+
+    public TreeMap<MessageQueue, ProcessQueueInfo> getMqTable() {
+        return mqTable;
+    }
+
+    public void setMqTable(TreeMap<MessageQueue, ProcessQueueInfo> mqTable) {
+        this.mqTable = mqTable;
+    }
+
+    public TreeMap<String, ConsumeStatus> getStatusTable() {
+        return statusTable;
+    }
+
+    public void setStatusTable(TreeMap<String, ConsumeStatus> statusTable) {
+        this.statusTable = statusTable;
+    }
+
+    public String formatString() {
+        StringBuilder sb = new StringBuilder();
+
+        {
+            sb.append("#Consumer Properties#\n");
+            Iterator<Entry<Object, Object>> it = this.properties.entrySet().iterator();
+            while (it.hasNext()) {
+                Entry<Object, Object> next = it.next();
+                String item = String.format("%-40s: %s%n", next.getKey().toString(), next.getValue().toString());
+                sb.append(item);
+            }
+        }
+
+        {
+            sb.append("\n\n#Consumer Subscription#\n");
+
+            Iterator<SubscriptionData> it = this.subscriptionSet.iterator();
+            int i = 0;
+            while (it.hasNext()) {
+                SubscriptionData next = it.next();
+                String item = String.format("%03d Topic: %-40s ClassFilter: %-8s SubExpression: %s%n", //
+                        ++i, //
+                        next.getTopic(), //
+                        next.isClassFilterMode(), //
+                        next.getSubString());
+
+                sb.append(item);
+            }
+        }
+
+        {
+            sb.append("\n\n#Consumer Offset#\n");
+            sb.append(String.format("%-32s  %-32s  %-4s  %-20s%n", //
+                    "#Topic", //
+                    "#Broker Name", //
+                    "#QID", //
+                    "#Consumer Offset"//
+            ));
+
+            Iterator<Entry<MessageQueue, ProcessQueueInfo>> it = this.mqTable.entrySet().iterator();
+            while (it.hasNext()) {
+                Entry<MessageQueue, ProcessQueueInfo> next = it.next();
+                String item = String.format("%-32s  %-32s  %-4d  %-20d%n", //
+                        next.getKey().getTopic(), //
+                        next.getKey().getBrokerName(), //
+                        next.getKey().getQueueId(), //
+                        next.getValue().getCommitOffset());
+
+                sb.append(item);
+            }
+        }
+
+        {
+            sb.append("\n\n#Consumer MQ Detail#\n");
+            sb.append(String.format("%-32s  %-32s  %-4s  %-20s%n", //
+                    "#Topic", //
+                    "#Broker Name", //
+                    "#QID", //
+                    "#ProcessQueueInfo"//
+            ));
+
+            Iterator<Entry<MessageQueue, ProcessQueueInfo>> it = this.mqTable.entrySet().iterator();
+            while (it.hasNext()) {
+                Entry<MessageQueue, ProcessQueueInfo> next = it.next();
+                String item = String.format("%-32s  %-32s  %-4d  %s%n", //
+                        next.getKey().getTopic(), //
+                        next.getKey().getBrokerName(), //
+                        next.getKey().getQueueId(), //
+                        next.getValue().toString());
+
+                sb.append(item);
+            }
+        }
+
+        {
+            sb.append("\n\n#Consumer RT&TPS#\n");
+            sb.append(String.format("%-32s  %14s %14s %14s %14s %18s %25s%n", //
+                    "#Topic", //
+                    "#Pull RT", //
+                    "#Pull TPS", //
+                    "#Consume RT", //
+                    "#ConsumeOK TPS", //
+                    "#ConsumeFailed TPS", //
+                    "#ConsumeFailedMsgsInHour"//
+            ));
+
+            Iterator<Entry<String, ConsumeStatus>> it = this.statusTable.entrySet().iterator();
+            while (it.hasNext()) {
+                Entry<String, ConsumeStatus> next = it.next();
+                String item = String.format("%-32s  %14.2f %14.2f %14.2f %14.2f %18.2f %25d%n", //
+                        next.getKey(), //
+                        next.getValue().getPullRT(), //
+                        next.getValue().getPullTPS(), //
+                        next.getValue().getConsumeRT(), //
+                        next.getValue().getConsumeOKTPS(), //
+                        next.getValue().getConsumeFailedTPS(), //
+                        next.getValue().getConsumeFailedMsgs()//
+                );
+
+                sb.append(item);
+            }
+        }
+
+        if (this.jstack != null) {
+            sb.append("\n\n#Consumer jstack#\n");
+            sb.append(this.jstack);
+        }
+
+        return sb.toString();
+    }
+
+    public String getJstack() {
+        return jstack;
+    }
+
+
+    public void setJstack(String jstack) {
+        this.jstack = jstack;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/GetConsumerStatusBody.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/GetConsumerStatusBody.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/GetConsumerStatusBody.java
new file mode 100644
index 0000000..ca84f21
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/GetConsumerStatusBody.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package com.alibaba.rocketmq.common.protocol.body;
+
+import com.alibaba.rocketmq.common.message.MessageQueue;
+import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable;
+
+import java.util.HashMap;
+import java.util.Map;
+
+
+/**
+ * @author manhong.yqd
+ */
+@Deprecated
+public class GetConsumerStatusBody extends RemotingSerializable {
+    private Map<MessageQueue, Long> messageQueueTable = new HashMap<MessageQueue, Long>();
+    private Map<String, Map<MessageQueue, Long>> consumerTable =
+            new HashMap<String, Map<MessageQueue, Long>>();
+
+
+    public Map<MessageQueue, Long> getMessageQueueTable() {
+        return messageQueueTable;
+    }
+
+
+    public void setMessageQueueTable(Map<MessageQueue, Long> messageQueueTable) {
+        this.messageQueueTable = messageQueueTable;
+    }
+
+
+    public Map<String, Map<MessageQueue, Long>> getConsumerTable() {
+        return consumerTable;
+    }
+
+
+    public void setConsumerTable(Map<String, Map<MessageQueue, Long>> consumerTable) {
+        this.consumerTable = consumerTable;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/GroupList.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/GroupList.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/GroupList.java
new file mode 100644
index 0000000..9f7e500
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/GroupList.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.common.protocol.body;
+
+import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable;
+
+import java.util.HashSet;
+
+
+/**
+ * @author shijia.wxr
+ *
+ */
+public class GroupList extends RemotingSerializable {
+    private HashSet<String> groupList = new HashSet<String>();
+
+
+    public HashSet<String> getGroupList() {
+        return groupList;
+    }
+
+
+    public void setGroupList(HashSet<String> groupList) {
+        this.groupList = groupList;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/KVTable.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/KVTable.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/KVTable.java
new file mode 100644
index 0000000..41cfcb8
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/KVTable.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.common.protocol.body;
+
+import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable;
+
+import java.util.HashMap;
+
+
+/**
+ * @author shijia.wxr
+ *
+ */
+public class KVTable extends RemotingSerializable {
+    private HashMap<String, String> table = new HashMap<String, String>();
+
+
+    public HashMap<String, String> getTable() {
+        return table;
+    }
+
+
+    public void setTable(HashMap<String, String> table) {
+        this.table = table;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/LockBatchRequestBody.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/LockBatchRequestBody.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/LockBatchRequestBody.java
new file mode 100644
index 0000000..992f656
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/LockBatchRequestBody.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package com.alibaba.rocketmq.common.protocol.body;
+
+import com.alibaba.rocketmq.common.message.MessageQueue;
+import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable;
+
+import java.util.HashSet;
+import java.util.Set;
+
+
+/**
+ * @author shijia.wxr
+ *
+ */
+public class LockBatchRequestBody extends RemotingSerializable {
+    private String consumerGroup;
+    private String clientId;
+    private Set<MessageQueue> mqSet = new HashSet<MessageQueue>();
+
+
+    public String getConsumerGroup() {
+        return consumerGroup;
+    }
+
+
+    public void setConsumerGroup(String consumerGroup) {
+        this.consumerGroup = consumerGroup;
+    }
+
+
+    public String getClientId() {
+        return clientId;
+    }
+
+
+    public void setClientId(String clientId) {
+        this.clientId = clientId;
+    }
+
+
+    public Set<MessageQueue> getMqSet() {
+        return mqSet;
+    }
+
+
+    public void setMqSet(Set<MessageQueue> mqSet) {
+        this.mqSet = mqSet;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/LockBatchResponseBody.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/LockBatchResponseBody.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/LockBatchResponseBody.java
new file mode 100644
index 0000000..12f6c76
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/LockBatchResponseBody.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package com.alibaba.rocketmq.common.protocol.body;
+
+import com.alibaba.rocketmq.common.message.MessageQueue;
+import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable;
+
+import java.util.HashSet;
+import java.util.Set;
+
+
+/**
+ * @author shijia.wxr
+ *
+ */
+public class LockBatchResponseBody extends RemotingSerializable {
+
+    private Set<MessageQueue> lockOKMQSet = new HashSet<MessageQueue>();
+
+
+    public Set<MessageQueue> getLockOKMQSet() {
+        return lockOKMQSet;
+    }
+
+
+    public void setLockOKMQSet(Set<MessageQueue> lockOKMQSet) {
+        this.lockOKMQSet = lockOKMQSet;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ProcessQueueInfo.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ProcessQueueInfo.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ProcessQueueInfo.java
new file mode 100644
index 0000000..6c17443
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ProcessQueueInfo.java
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package com.alibaba.rocketmq.common.protocol.body;
+
+import com.alibaba.rocketmq.common.UtilAll;
+
+
+public class ProcessQueueInfo {
+    private long commitOffset;
+
+    private long cachedMsgMinOffset;
+    private long cachedMsgMaxOffset;
+    private int cachedMsgCount;
+
+    private long transactionMsgMinOffset;
+    private long transactionMsgMaxOffset;
+    private int transactionMsgCount;
+
+    private boolean locked;
+    private long tryUnlockTimes;
+    private long lastLockTimestamp;
+
+    private boolean droped;
+    private long lastPullTimestamp;
+    private long lastConsumeTimestamp;
+
+
+    public long getCommitOffset() {
+        return commitOffset;
+    }
+
+
+    public void setCommitOffset(long commitOffset) {
+        this.commitOffset = commitOffset;
+    }
+
+
+    public long getCachedMsgMinOffset() {
+        return cachedMsgMinOffset;
+    }
+
+
+    public void setCachedMsgMinOffset(long cachedMsgMinOffset) {
+        this.cachedMsgMinOffset = cachedMsgMinOffset;
+    }
+
+
+    public long getCachedMsgMaxOffset() {
+        return cachedMsgMaxOffset;
+    }
+
+
+    public void setCachedMsgMaxOffset(long cachedMsgMaxOffset) {
+        this.cachedMsgMaxOffset = cachedMsgMaxOffset;
+    }
+
+
+    public int getCachedMsgCount() {
+        return cachedMsgCount;
+    }
+
+
+    public void setCachedMsgCount(int cachedMsgCount) {
+        this.cachedMsgCount = cachedMsgCount;
+    }
+
+
+    public long getTransactionMsgMinOffset() {
+        return transactionMsgMinOffset;
+    }
+
+
+    public void setTransactionMsgMinOffset(long transactionMsgMinOffset) {
+        this.transactionMsgMinOffset = transactionMsgMinOffset;
+    }
+
+
+    public long getTransactionMsgMaxOffset() {
+        return transactionMsgMaxOffset;
+    }
+
+
+    public void setTransactionMsgMaxOffset(long transactionMsgMaxOffset) {
+        this.transactionMsgMaxOffset = transactionMsgMaxOffset;
+    }
+
+
+    public int getTransactionMsgCount() {
+        return transactionMsgCount;
+    }
+
+
+    public void setTransactionMsgCount(int transactionMsgCount) {
+        this.transactionMsgCount = transactionMsgCount;
+    }
+
+
+    public boolean isLocked() {
+        return locked;
+    }
+
+
+    public void setLocked(boolean locked) {
+        this.locked = locked;
+    }
+
+
+    public long getTryUnlockTimes() {
+        return tryUnlockTimes;
+    }
+
+
+    public void setTryUnlockTimes(long tryUnlockTimes) {
+        this.tryUnlockTimes = tryUnlockTimes;
+    }
+
+
+    public long getLastLockTimestamp() {
+        return lastLockTimestamp;
+    }
+
+
+    public void setLastLockTimestamp(long lastLockTimestamp) {
+        this.lastLockTimestamp = lastLockTimestamp;
+    }
+
+
+    public boolean isDroped() {
+        return droped;
+    }
+
+
+    public void setDroped(boolean droped) {
+        this.droped = droped;
+    }
+
+
+    public long getLastPullTimestamp() {
+        return lastPullTimestamp;
+    }
+
+
+    public void setLastPullTimestamp(long lastPullTimestamp) {
+        this.lastPullTimestamp = lastPullTimestamp;
+    }
+
+
+    public long getLastConsumeTimestamp() {
+        return lastConsumeTimestamp;
+    }
+
+
+    public void setLastConsumeTimestamp(long lastConsumeTimestamp) {
+        this.lastConsumeTimestamp = lastConsumeTimestamp;
+    }
+
+
+    @Override
+    public String toString() {
+        return "ProcessQueueInfo [commitOffset=" + commitOffset + ", cachedMsgMinOffset="
+                + cachedMsgMinOffset + ", cachedMsgMaxOffset=" + cachedMsgMaxOffset + ", cachedMsgCount="
+                + cachedMsgCount + ", transactionMsgMinOffset=" + transactionMsgMinOffset
+                + ", transactionMsgMaxOffset=" + transactionMsgMaxOffset + ", transactionMsgCount="
+                + transactionMsgCount + ", locked=" + locked + ", tryUnlockTimes=" + tryUnlockTimes
+                + ", lastLockTimestamp=" + UtilAll.timeMillisToHumanString(lastLockTimestamp) + ", droped="
+                + droped + ", lastPullTimestamp=" + UtilAll.timeMillisToHumanString(lastPullTimestamp)
+                + ", lastConsumeTimestamp=" + UtilAll.timeMillisToHumanString(lastConsumeTimestamp) + "]";
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ProducerConnection.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ProducerConnection.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ProducerConnection.java
new file mode 100644
index 0000000..32fe1d0
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ProducerConnection.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package com.alibaba.rocketmq.common.protocol.body;
+
+import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable;
+
+import java.util.HashSet;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class ProducerConnection extends RemotingSerializable {
+    private HashSet<Connection> connectionSet = new HashSet<Connection>();
+
+
+    public HashSet<Connection> getConnectionSet() {
+        return connectionSet;
+    }
+
+
+    public void setConnectionSet(HashSet<Connection> connectionSet) {
+        this.connectionSet = connectionSet;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/QueryConsumeTimeSpanBody.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/QueryConsumeTimeSpanBody.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/QueryConsumeTimeSpanBody.java
new file mode 100644
index 0000000..2f52666
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/QueryConsumeTimeSpanBody.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package com.alibaba.rocketmq.common.protocol.body;
+
+import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable;
+
+import java.util.ArrayList;
+import java.util.List;
+
+
+/**
+ * @author manhong.yqd
+ */
+public class QueryConsumeTimeSpanBody extends RemotingSerializable {
+    List<QueueTimeSpan> consumeTimeSpanSet = new ArrayList<QueueTimeSpan>();
+
+
+    public List<QueueTimeSpan> getConsumeTimeSpanSet() {
+        return consumeTimeSpanSet;
+    }
+
+
+    public void setConsumeTimeSpanSet(List<QueueTimeSpan> consumeTimeSpanSet) {
+        this.consumeTimeSpanSet = consumeTimeSpanSet;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/QueryCorrectionOffsetBody.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/QueryCorrectionOffsetBody.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/QueryCorrectionOffsetBody.java
new file mode 100644
index 0000000..225b90c
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/QueryCorrectionOffsetBody.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.common.protocol.body;
+
+import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable;
+
+import java.util.HashMap;
+import java.util.Map;
+
+
+/**
+ * @author manhong.yqd
+ */
+public class QueryCorrectionOffsetBody extends RemotingSerializable {
+    private Map<Integer, Long> correctionOffsets = new HashMap<Integer, Long>();
+
+
+    public Map<Integer, Long> getCorrectionOffsets() {
+        return correctionOffsets;
+    }
+
+
+    public void setCorrectionOffsets(Map<Integer, Long> correctionOffsets) {
+        this.correctionOffsets = correctionOffsets;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/QueueTimeSpan.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/QueueTimeSpan.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/QueueTimeSpan.java
new file mode 100644
index 0000000..14001ec
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/QueueTimeSpan.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package com.alibaba.rocketmq.common.protocol.body;
+
+import com.alibaba.rocketmq.common.UtilAll;
+import com.alibaba.rocketmq.common.message.MessageQueue;
+
+import java.util.Date;
+
+
+/**
+ * @author manhong.yqd
+ */
+public class QueueTimeSpan {
+    private MessageQueue messageQueue;
+    private long minTimeStamp;
+    private long maxTimeStamp;
+    private long consumeTimeStamp;
+    private long delayTime;
+
+
+    public MessageQueue getMessageQueue() {
+        return messageQueue;
+    }
+
+
+    public void setMessageQueue(MessageQueue messageQueue) {
+        this.messageQueue = messageQueue;
+    }
+
+
+    public long getMinTimeStamp() {
+        return minTimeStamp;
+    }
+
+
+    public void setMinTimeStamp(long minTimeStamp) {
+        this.minTimeStamp = minTimeStamp;
+    }
+
+
+    public long getMaxTimeStamp() {
+        return maxTimeStamp;
+    }
+
+
+    public void setMaxTimeStamp(long maxTimeStamp) {
+        this.maxTimeStamp = maxTimeStamp;
+    }
+
+
+    public long getConsumeTimeStamp() {
+        return consumeTimeStamp;
+    }
+
+
+    public void setConsumeTimeStamp(long consumeTimeStamp) {
+        this.consumeTimeStamp = consumeTimeStamp;
+    }
+
+
+    public String getMinTimeStampStr() {
+        return UtilAll.formatDate(new Date(minTimeStamp), UtilAll.YYYY_MM_DD_HH_MM_SS_SSS);
+    }
+
+
+    public String getMaxTimeStampStr() {
+        return UtilAll.formatDate(new Date(maxTimeStamp), UtilAll.YYYY_MM_DD_HH_MM_SS_SSS);
+    }
+
+
+    public String getConsumeTimeStampStr() {
+        return UtilAll.formatDate(new Date(consumeTimeStamp), UtilAll.YYYY_MM_DD_HH_MM_SS_SSS);
+    }
+
+
+    public long getDelayTime() {
+        return delayTime;
+    }
+
+
+    public void setDelayTime(long delayTime) {
+        this.delayTime = delayTime;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/RegisterBrokerBody.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/RegisterBrokerBody.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/RegisterBrokerBody.java
new file mode 100644
index 0000000..364bbcb
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/RegisterBrokerBody.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package com.alibaba.rocketmq.common.protocol.body;
+
+import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable;
+
+import java.util.ArrayList;
+import java.util.List;
+
+
+public class RegisterBrokerBody extends RemotingSerializable {
+    private TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper();
+    private List<String> filterServerList = new ArrayList<String>();
+
+
+    public TopicConfigSerializeWrapper getTopicConfigSerializeWrapper() {
+        return topicConfigSerializeWrapper;
+    }
+
+
+    public void setTopicConfigSerializeWrapper(TopicConfigSerializeWrapper topicConfigSerializeWrapper) {
+        this.topicConfigSerializeWrapper = topicConfigSerializeWrapper;
+    }
+
+
+    public List<String> getFilterServerList() {
+        return filterServerList;
+    }
+
+
+    public void setFilterServerList(List<String> filterServerList) {
+        this.filterServerList = filterServerList;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ResetOffsetBody.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ResetOffsetBody.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ResetOffsetBody.java
new file mode 100644
index 0000000..2122e61
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ResetOffsetBody.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package com.alibaba.rocketmq.common.protocol.body;
+
+import com.alibaba.rocketmq.common.message.MessageQueue;
+import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable;
+
+import java.util.Map;
+
+
+/**
+ * @author manhong.yqd
+ *
+ */
+public class ResetOffsetBody extends RemotingSerializable {
+    private Map<MessageQueue, Long> offsetTable;
+
+
+    public Map<MessageQueue, Long> getOffsetTable() {
+        return offsetTable;
+    }
+
+
+    public void setOffsetTable(Map<MessageQueue, Long> offsetTable) {
+        this.offsetTable = offsetTable;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ResetOffsetBodyForC.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ResetOffsetBodyForC.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ResetOffsetBodyForC.java
new file mode 100644
index 0000000..fb7360e
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ResetOffsetBodyForC.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package com.alibaba.rocketmq.common.protocol.body;
+
+import com.alibaba.rocketmq.common.message.MessageQueueForC;
+import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable;
+
+import java.util.List;
+
+public class ResetOffsetBodyForC extends RemotingSerializable {
+
+    private List<MessageQueueForC> offsetTable;
+
+
+    public List<MessageQueueForC> getOffsetTable() {
+        return offsetTable;
+    }
+
+
+    public void setOffsetTable(List<MessageQueueForC> offsetTable) {
+        this.offsetTable = offsetTable;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/SubscriptionGroupWrapper.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/SubscriptionGroupWrapper.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/SubscriptionGroupWrapper.java
new file mode 100644
index 0000000..096672c
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/SubscriptionGroupWrapper.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package com.alibaba.rocketmq.common.protocol.body;
+
+import com.alibaba.rocketmq.common.DataVersion;
+import com.alibaba.rocketmq.common.subscription.SubscriptionGroupConfig;
+import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+
+/**
+ * @author manhong.yqd
+ */
+public class SubscriptionGroupWrapper extends RemotingSerializable {
+    private ConcurrentHashMap<String, SubscriptionGroupConfig> subscriptionGroupTable =
+            new ConcurrentHashMap<String, SubscriptionGroupConfig>(1024);
+    private DataVersion dataVersion = new DataVersion();
+
+
+    public ConcurrentHashMap<String, SubscriptionGroupConfig> getSubscriptionGroupTable() {
+        return subscriptionGroupTable;
+    }
+
+
+    public void setSubscriptionGroupTable(
+            ConcurrentHashMap<String, SubscriptionGroupConfig> subscriptionGroupTable) {
+        this.subscriptionGroupTable = subscriptionGroupTable;
+    }
+
+
+    public DataVersion getDataVersion() {
+        return dataVersion;
+    }
+
+
+    public void setDataVersion(DataVersion dataVersion) {
+        this.dataVersion = dataVersion;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/TopicConfigSerializeWrapper.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/TopicConfigSerializeWrapper.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/TopicConfigSerializeWrapper.java
new file mode 100644
index 0000000..0050762
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/TopicConfigSerializeWrapper.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package com.alibaba.rocketmq.common.protocol.body;
+
+import com.alibaba.rocketmq.common.DataVersion;
+import com.alibaba.rocketmq.common.TopicConfig;
+import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+
+public class TopicConfigSerializeWrapper extends RemotingSerializable {
+    private ConcurrentHashMap<String, TopicConfig> topicConfigTable =
+            new ConcurrentHashMap<String, TopicConfig>();
+    private DataVersion dataVersion = new DataVersion();
+
+
+    public ConcurrentHashMap<String, TopicConfig> getTopicConfigTable() {
+        return topicConfigTable;
+    }
+
+
+    public void setTopicConfigTable(ConcurrentHashMap<String, TopicConfig> topicConfigTable) {
+        this.topicConfigTable = topicConfigTable;
+    }
+
+
+    public DataVersion getDataVersion() {
+        return dataVersion;
+    }
+
+
+    public void setDataVersion(DataVersion dataVersion) {
+        this.dataVersion = dataVersion;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/TopicList.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/TopicList.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/TopicList.java
new file mode 100644
index 0000000..84912ce
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/TopicList.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.common.protocol.body;
+
+import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable;
+
+import java.util.HashSet;
+import java.util.Set;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class TopicList extends RemotingSerializable {
+    private Set<String> topicList = new HashSet<String>();
+    private String brokerAddr;
+
+
+    public Set<String> getTopicList() {
+        return topicList;
+    }
+
+
+    public void setTopicList(Set<String> topicList) {
+        this.topicList = topicList;
+    }
+
+
+    public String getBrokerAddr() {
+        return brokerAddr;
+    }
+
+
+    public void setBrokerAddr(String brokerAddr) {
+        this.brokerAddr = brokerAddr;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/UnlockBatchRequestBody.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/UnlockBatchRequestBody.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/UnlockBatchRequestBody.java
new file mode 100644
index 0000000..542b797
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/UnlockBatchRequestBody.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package com.alibaba.rocketmq.common.protocol.body;
+
+import com.alibaba.rocketmq.common.message.MessageQueue;
+import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable;
+
+import java.util.HashSet;
+import java.util.Set;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class UnlockBatchRequestBody extends RemotingSerializable {
+    private String consumerGroup;
+    private String clientId;
+    private Set<MessageQueue> mqSet = new HashSet<MessageQueue>();
+
+
+    public String getConsumerGroup() {
+        return consumerGroup;
+    }
+
+
+    public void setConsumerGroup(String consumerGroup) {
+        this.consumerGroup = consumerGroup;
+    }
+
+
+    public String getClientId() {
+        return clientId;
+    }
+
+
+    public void setClientId(String clientId) {
+        this.clientId = clientId;
+    }
+
+
+    public Set<MessageQueue> getMqSet() {
+        return mqSet;
+    }
+
+
+    public void setMqSet(Set<MessageQueue> mqSet) {
+        this.mqSet = mqSet;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/header/CheckTransactionStateRequestHeader.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/header/CheckTransactionStateRequestHeader.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/header/CheckTransactionStateRequestHeader.java
new file mode 100644
index 0000000..37d6a7f
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/header/CheckTransactionStateRequestHeader.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+/**
+ * $Id: EndTransactionRequestHeader.java 1835 2013-05-16 02:00:50Z shijia.wxr $
+ */
+package com.alibaba.rocketmq.common.protocol.header;
+
+import com.alibaba.rocketmq.remoting.CommandCustomHeader;
+import com.alibaba.rocketmq.remoting.annotation.CFNotNull;
+import com.alibaba.rocketmq.remoting.exception.RemotingCommandException;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class CheckTransactionStateRequestHeader implements CommandCustomHeader {
+    @CFNotNull
+    private Long tranStateTableOffset;
+    @CFNotNull
+    private Long commitLogOffset;
+    private String msgId;
+    private String transactionId;
+
+
+    @Override
+    public void checkFields() throws RemotingCommandException {
+    }
+
+
+    public Long getTranStateTableOffset() {
+        return tranStateTableOffset;
+    }
+
+
+    public void setTranStateTableOffset(Long tranStateTableOffset) {
+        this.tranStateTableOffset = tranStateTableOffset;
+    }
+
+
+    public Long getCommitLogOffset() {
+        return commitLogOffset;
+    }
+
+
+    public void setCommitLogOffset(Long commitLogOffset) {
+        this.commitLogOffset = commitLogOffset;
+    }
+
+    public String getMsgId() {
+        return msgId;
+    }
+
+    public void setMsgId(String msgId) {
+        this.msgId = msgId;
+    }
+
+    public String getTransactionId() {
+        return transactionId;
+    }
+
+    public void setTransactionId(String transactionId) {
+        this.transactionId = transactionId;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/header/CheckTransactionStateResponseHeader.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/header/CheckTransactionStateResponseHeader.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/header/CheckTransactionStateResponseHeader.java
new file mode 100644
index 0000000..76c9732
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/header/CheckTransactionStateResponseHeader.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+/**
+ * $Id: EndTransactionResponseHeader.java 1835 2013-05-16 02:00:50Z shijia.wxr $
+ */
+package com.alibaba.rocketmq.common.protocol.header;
+
+import com.alibaba.rocketmq.common.sysflag.MessageSysFlag;
+import com.alibaba.rocketmq.remoting.CommandCustomHeader;
+import com.alibaba.rocketmq.remoting.annotation.CFNotNull;
+import com.alibaba.rocketmq.remoting.exception.RemotingCommandException;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class CheckTransactionStateResponseHeader implements CommandCustomHeader {
+    @CFNotNull
+    private String producerGroup;
+    @CFNotNull
+    private Long tranStateTableOffset;
+    @CFNotNull
+    private Long commitLogOffset;
+    @CFNotNull
+    private Integer commitOrRollback; // TRANSACTION_COMMIT_TYPE
+
+
+    // TRANSACTION_ROLLBACK_TYPE
+
+    @Override
+    public void checkFields() throws RemotingCommandException {
+        if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == this.commitOrRollback) {
+            return;
+        }
+
+        if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == this.commitOrRollback) {
+            return;
+        }
+
+        throw new RemotingCommandException("commitOrRollback field wrong");
+    }
+
+
+    public String getProducerGroup() {
+        return producerGroup;
+    }
+
+
+    public void setProducerGroup(String producerGroup) {
+        this.producerGroup = producerGroup;
+    }
+
+
+    public Long getTranStateTableOffset() {
+        return tranStateTableOffset;
+    }
+
+
+    public void setTranStateTableOffset(Long tranStateTableOffset) {
+        this.tranStateTableOffset = tranStateTableOffset;
+    }
+
+
+    public Long getCommitLogOffset() {
+        return commitLogOffset;
+    }
+
+
+    public void setCommitLogOffset(Long commitLogOffset) {
+        this.commitLogOffset = commitLogOffset;
+    }
+
+
+    public Integer getCommitOrRollback() {
+        return commitOrRollback;
+    }
+
+
+    public void setCommitOrRollback(Integer commitOrRollback) {
+        this.commitOrRollback = commitOrRollback;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/header/CloneGroupOffsetRequestHeader.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/header/CloneGroupOffsetRequestHeader.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/header/CloneGroupOffsetRequestHeader.java
new file mode 100644
index 0000000..6043229
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/header/CloneGroupOffsetRequestHeader.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+/**
+ * $Id: DeleteTopicRequestHeader.java 1835 2013-05-16 02:00:50Z shijia.wxr $
+ */
+package com.alibaba.rocketmq.common.protocol.header;
+
+import com.alibaba.rocketmq.remoting.CommandCustomHeader;
+import com.alibaba.rocketmq.remoting.annotation.CFNotNull;
+import com.alibaba.rocketmq.remoting.exception.RemotingCommandException;
+
+
+/**
+ * @author manhong.yqd
+ */
+public class CloneGroupOffsetRequestHeader implements CommandCustomHeader {
+    @CFNotNull
+    private String srcGroup;
+    @CFNotNull
+    private String destGroup;
+    private String topic;
+    private boolean offline;
+
+
+    @Override
+    public void checkFields() throws RemotingCommandException {
+    }
+
+
+    public String getDestGroup() {
+        return destGroup;
+    }
+
+
+    public void setDestGroup(String destGroup) {
+        this.destGroup = destGroup;
+    }
+
+
+    public String getTopic() {
+        return topic;
+    }
+
+
+    public void setTopic(String topic) {
+        this.topic = topic;
+    }
+
+
+    public String getSrcGroup() {
+
+        return srcGroup;
+    }
+
+
+    public void setSrcGroup(String srcGroup) {
+        this.srcGroup = srcGroup;
+    }
+
+
+    public boolean isOffline() {
+        return offline;
+    }
+
+
+    public void setOffline(boolean offline) {
+        this.offline = offline;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/header/ConsumeMessageDirectlyResultRequestHeader.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/header/ConsumeMessageDirectlyResultRequestHeader.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/header/ConsumeMessageDirectlyResultRequestHeader.java
new file mode 100644
index 0000000..3c68636
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/header/ConsumeMessageDirectlyResultRequestHeader.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package com.alibaba.rocketmq.common.protocol.header;
+
+import com.alibaba.rocketmq.remoting.CommandCustomHeader;
+import com.alibaba.rocketmq.remoting.annotation.CFNotNull;
+import com.alibaba.rocketmq.remoting.annotation.CFNullable;
+import com.alibaba.rocketmq.remoting.exception.RemotingCommandException;
+
+
+public class ConsumeMessageDirectlyResultRequestHeader implements CommandCustomHeader {
+    @CFNotNull
+    private String consumerGroup;
+    @CFNullable
+    private String clientId;
+    @CFNullable
+    private String msgId;
+    @CFNullable
+    private String brokerName;
+
+
+    @Override
+    public void checkFields() throws RemotingCommandException {
+    }
+
+
+    public String getConsumerGroup() {
+        return consumerGroup;
+    }
+
+
+    public void setConsumerGroup(String consumerGroup) {
+        this.consumerGroup = consumerGroup;
+    }
+
+
+    public String getBrokerName() {
+        return brokerName;
+    }
+
+
+    public void setBrokerName(String brokerName) {
+        this.brokerName = brokerName;
+    }
+
+
+    public String getClientId() {
+        return clientId;
+    }
+
+
+    public void setClientId(String clientId) {
+        this.clientId = clientId;
+    }
+
+
+    public String getMsgId() {
+        return msgId;
+    }
+
+
+    public void setMsgId(String msgId) {
+        this.msgId = msgId;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/header/ConsumerSendMsgBackRequestHeader.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/header/ConsumerSendMsgBackRequestHeader.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/header/ConsumerSendMsgBackRequestHeader.java
new file mode 100644
index 0000000..c0acf88
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/header/ConsumerSendMsgBackRequestHeader.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package com.alibaba.rocketmq.common.protocol.header;
+
+import com.alibaba.rocketmq.remoting.CommandCustomHeader;
+import com.alibaba.rocketmq.remoting.annotation.CFNotNull;
+import com.alibaba.rocketmq.remoting.annotation.CFNullable;
+import com.alibaba.rocketmq.remoting.exception.RemotingCommandException;
+
+
+/**
+ * @author shijia.wxr
+ *
+ */
+public class ConsumerSendMsgBackRequestHeader implements CommandCustomHeader {
+    @CFNotNull
+    private Long offset;
+    @CFNotNull
+    private String group;
+    @CFNotNull
+    private Integer delayLevel;
+    private String originMsgId;
+    private String originTopic;
+    @CFNullable
+    private boolean unitMode = false;
+    private Integer maxReconsumeTimes;
+
+
+    @Override
+    public void checkFields() throws RemotingCommandException {
+
+    }
+
+
+    public Long getOffset() {
+        return offset;
+    }
+
+
+    public void setOffset(Long offset) {
+        this.offset = offset;
+    }
+
+
+    public String getGroup() {
+        return group;
+    }
+
+
+    public void setGroup(String group) {
+        this.group = group;
+    }
+
+
+    public Integer getDelayLevel() {
+        return delayLevel;
+    }
+
+
+    public void setDelayLevel(Integer delayLevel) {
+        this.delayLevel = delayLevel;
+    }
+
+
+    public String getOriginMsgId() {
+        return originMsgId;
+    }
+
+
+    public void setOriginMsgId(String originMsgId) {
+        this.originMsgId = originMsgId;
+    }
+
+
+    public String getOriginTopic() {
+        return originTopic;
+    }
+
+
+    public void setOriginTopic(String originTopic) {
+        this.originTopic = originTopic;
+    }
+
+
+    public boolean isUnitMode() {
+        return unitMode;
+    }
+
+
+    public void setUnitMode(boolean unitMode) {
+        this.unitMode = unitMode;
+    }
+
+
+    public Integer getMaxReconsumeTimes() {
+        return maxReconsumeTimes;
+    }
+
+
+    public void setMaxReconsumeTimes(final Integer maxReconsumeTimes) {
+        this.maxReconsumeTimes = maxReconsumeTimes;
+    }
+
+
+    @Override
+    public String toString() {
+        return "ConsumerSendMsgBackRequestHeader [group=" + group + ", originTopic=" + originTopic + ", originMsgId=" + originMsgId
+                + ", delayLevel=" + delayLevel + ", unitMode=" + unitMode + ", maxReconsumeTimes=" + maxReconsumeTimes + "]";
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/header/CreateTopicRequestHeader.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/header/CreateTopicRequestHeader.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/header/CreateTopicRequestHeader.java
new file mode 100644
index 0000000..a9d219c
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/header/CreateTopicRequestHeader.java
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+/**
+ * $Id: CreateTopicRequestHeader.java 1835 2013-05-16 02:00:50Z shijia.wxr $
+ */
+package com.alibaba.rocketmq.common.protocol.header;
+
+import com.alibaba.rocketmq.common.TopicFilterType;
+import com.alibaba.rocketmq.remoting.CommandCustomHeader;
+import com.alibaba.rocketmq.remoting.annotation.CFNotNull;
+import com.alibaba.rocketmq.remoting.exception.RemotingCommandException;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class CreateTopicRequestHeader implements CommandCustomHeader {
+    @CFNotNull
+    private String topic;
+    @CFNotNull
+    private String defaultTopic;
+    @CFNotNull
+    private Integer readQueueNums;
+    @CFNotNull
+    private Integer writeQueueNums;
+    @CFNotNull
+    private Integer perm;
+    @CFNotNull
+    private String topicFilterType;
+    private Integer topicSysFlag;
+    @CFNotNull
+    private Boolean order = false;
+
+
+    @Override
+    public void checkFields() throws RemotingCommandException {
+        try {
+            TopicFilterType.valueOf(this.topicFilterType);
+        } catch (Exception e) {
+            throw new RemotingCommandException("topicFilterType = [" + topicFilterType + "] value invalid", e);
+        }
+    }
+
+
+    public TopicFilterType getTopicFilterTypeEnum() {
+        return TopicFilterType.valueOf(this.topicFilterType);
+    }
+
+
+    public String getTopic() {
+        return topic;
+    }
+
+
+    public void setTopic(String topic) {
+        this.topic = topic;
+    }
+
+
+    public String getDefaultTopic() {
+        return defaultTopic;
+    }
+
+
+    public void setDefaultTopic(String defaultTopic) {
+        this.defaultTopic = defaultTopic;
+    }
+
+
+    public Integer getReadQueueNums() {
+        return readQueueNums;
+    }
+
+
+    public void setReadQueueNums(Integer readQueueNums) {
+        this.readQueueNums = readQueueNums;
+    }
+
+
+    public Integer getWriteQueueNums() {
+        return writeQueueNums;
+    }
+
+
+    public void setWriteQueueNums(Integer writeQueueNums) {
+        this.writeQueueNums = writeQueueNums;
+    }
+
+
+    public Integer getPerm() {
+        return perm;
+    }
+
+
+    public void setPerm(Integer perm) {
+        this.perm = perm;
+    }
+
+
+    public String getTopicFilterType() {
+        return topicFilterType;
+    }
+
+
+    public void setTopicFilterType(String topicFilterType) {
+        this.topicFilterType = topicFilterType;
+    }
+
+
+    public Integer getTopicSysFlag() {
+        return topicSysFlag;
+    }
+
+
+    public void setTopicSysFlag(Integer topicSysFlag) {
+        this.topicSysFlag = topicSysFlag;
+    }
+
+
+    public Boolean getOrder() {
+        return order;
+    }
+
+
+    public void setOrder(Boolean order) {
+        this.order = order;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/header/DeleteSubscriptionGroupRequestHeader.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/header/DeleteSubscriptionGroupRequestHeader.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/header/DeleteSubscriptionGroupRequestHeader.java
new file mode 100644
index 0000000..9307c01
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/header/DeleteSubscriptionGroupRequestHeader.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package com.alibaba.rocketmq.common.protocol.header;
+
+import com.alibaba.rocketmq.remoting.CommandCustomHeader;
+import com.alibaba.rocketmq.remoting.annotation.CFNotNull;
+import com.alibaba.rocketmq.remoting.exception.RemotingCommandException;
+
+
+/**
+ * @author manhong.yqd
+ */
+public class DeleteSubscriptionGroupRequestHeader implements CommandCustomHeader {
+    @CFNotNull
+    private String groupName;
+
+
+    @Override
+    public void checkFields() throws RemotingCommandException {
+    }
+
+
+    public String getGroupName() {
+        return groupName;
+    }
+
+
+    public void setGroupName(String groupName) {
+        this.groupName = groupName;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/header/DeleteTopicRequestHeader.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/header/DeleteTopicRequestHeader.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/header/DeleteTopicRequestHeader.java
new file mode 100644
index 0000000..4b1a844
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/header/DeleteTopicRequestHeader.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+/**
+ * $Id: DeleteTopicRequestHeader.java 1835 2013-05-16 02:00:50Z shijia.wxr $
+ */
+package com.alibaba.rocketmq.common.protocol.header;
+
+import com.alibaba.rocketmq.remoting.CommandCustomHeader;
+import com.alibaba.rocketmq.remoting.annotation.CFNotNull;
+import com.alibaba.rocketmq.remoting.exception.RemotingCommandException;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class DeleteTopicRequestHeader implements CommandCustomHeader {
+    @CFNotNull
+    private String topic;
+
+
+    @Override
+    public void checkFields() throws RemotingCommandException {
+    }
+
+
+    public String getTopic() {
+        return topic;
+    }
+
+
+    public void setTopic(String topic) {
+        this.topic = topic;
+    }
+}