You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2016/12/28 09:14:00 UTC

[09/99] [abbrv] [partial] incubator-rocketmq git commit: ROCKETMQ-18 Rename package name from com.alibaba to org.apache

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/protocol/body/KVTable.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/KVTable.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/KVTable.java
new file mode 100644
index 0000000..09b090b
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/KVTable.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.common.protocol.body;
+
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+
+import java.util.HashMap;
+
+
+/**
+ * @author shijia.wxr
+ *
+ */
+public class KVTable extends RemotingSerializable {
+    private HashMap<String, String> table = new HashMap<String, String>();
+
+
+    public HashMap<String, String> getTable() {
+        return table;
+    }
+
+
+    public void setTable(HashMap<String, String> table) {
+        this.table = table;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/protocol/body/LockBatchRequestBody.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/LockBatchRequestBody.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/LockBatchRequestBody.java
new file mode 100644
index 0000000..87e4d6a
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/LockBatchRequestBody.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.common.protocol.body;
+
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+
+import java.util.HashSet;
+import java.util.Set;
+
+
+/**
+ * @author shijia.wxr
+ *
+ */
+public class LockBatchRequestBody extends RemotingSerializable {
+    private String consumerGroup;
+    private String clientId;
+    private Set<MessageQueue> mqSet = new HashSet<MessageQueue>();
+
+
+    public String getConsumerGroup() {
+        return consumerGroup;
+    }
+
+
+    public void setConsumerGroup(String consumerGroup) {
+        this.consumerGroup = consumerGroup;
+    }
+
+
+    public String getClientId() {
+        return clientId;
+    }
+
+
+    public void setClientId(String clientId) {
+        this.clientId = clientId;
+    }
+
+
+    public Set<MessageQueue> getMqSet() {
+        return mqSet;
+    }
+
+
+    public void setMqSet(Set<MessageQueue> mqSet) {
+        this.mqSet = mqSet;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/protocol/body/LockBatchResponseBody.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/LockBatchResponseBody.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/LockBatchResponseBody.java
new file mode 100644
index 0000000..04a5106
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/LockBatchResponseBody.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.common.protocol.body;
+
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+
+import java.util.HashSet;
+import java.util.Set;
+
+
+/**
+ * @author shijia.wxr
+ *
+ */
+public class LockBatchResponseBody extends RemotingSerializable {
+
+    private Set<MessageQueue> lockOKMQSet = new HashSet<MessageQueue>();
+
+
+    public Set<MessageQueue> getLockOKMQSet() {
+        return lockOKMQSet;
+    }
+
+
+    public void setLockOKMQSet(Set<MessageQueue> lockOKMQSet) {
+        this.lockOKMQSet = lockOKMQSet;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/protocol/body/ProcessQueueInfo.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ProcessQueueInfo.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ProcessQueueInfo.java
new file mode 100644
index 0000000..0ceaa69
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ProcessQueueInfo.java
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.common.protocol.body;
+
+import org.apache.rocketmq.common.UtilAll;
+
+
+public class ProcessQueueInfo {
+    private long commitOffset;
+
+    private long cachedMsgMinOffset;
+    private long cachedMsgMaxOffset;
+    private int cachedMsgCount;
+
+    private long transactionMsgMinOffset;
+    private long transactionMsgMaxOffset;
+    private int transactionMsgCount;
+
+    private boolean locked;
+    private long tryUnlockTimes;
+    private long lastLockTimestamp;
+
+    private boolean droped;
+    private long lastPullTimestamp;
+    private long lastConsumeTimestamp;
+
+
+    public long getCommitOffset() {
+        return commitOffset;
+    }
+
+
+    public void setCommitOffset(long commitOffset) {
+        this.commitOffset = commitOffset;
+    }
+
+
+    public long getCachedMsgMinOffset() {
+        return cachedMsgMinOffset;
+    }
+
+
+    public void setCachedMsgMinOffset(long cachedMsgMinOffset) {
+        this.cachedMsgMinOffset = cachedMsgMinOffset;
+    }
+
+
+    public long getCachedMsgMaxOffset() {
+        return cachedMsgMaxOffset;
+    }
+
+
+    public void setCachedMsgMaxOffset(long cachedMsgMaxOffset) {
+        this.cachedMsgMaxOffset = cachedMsgMaxOffset;
+    }
+
+
+    public int getCachedMsgCount() {
+        return cachedMsgCount;
+    }
+
+
+    public void setCachedMsgCount(int cachedMsgCount) {
+        this.cachedMsgCount = cachedMsgCount;
+    }
+
+
+    public long getTransactionMsgMinOffset() {
+        return transactionMsgMinOffset;
+    }
+
+
+    public void setTransactionMsgMinOffset(long transactionMsgMinOffset) {
+        this.transactionMsgMinOffset = transactionMsgMinOffset;
+    }
+
+
+    public long getTransactionMsgMaxOffset() {
+        return transactionMsgMaxOffset;
+    }
+
+
+    public void setTransactionMsgMaxOffset(long transactionMsgMaxOffset) {
+        this.transactionMsgMaxOffset = transactionMsgMaxOffset;
+    }
+
+
+    public int getTransactionMsgCount() {
+        return transactionMsgCount;
+    }
+
+
+    public void setTransactionMsgCount(int transactionMsgCount) {
+        this.transactionMsgCount = transactionMsgCount;
+    }
+
+
+    public boolean isLocked() {
+        return locked;
+    }
+
+
+    public void setLocked(boolean locked) {
+        this.locked = locked;
+    }
+
+
+    public long getTryUnlockTimes() {
+        return tryUnlockTimes;
+    }
+
+
+    public void setTryUnlockTimes(long tryUnlockTimes) {
+        this.tryUnlockTimes = tryUnlockTimes;
+    }
+
+
+    public long getLastLockTimestamp() {
+        return lastLockTimestamp;
+    }
+
+
+    public void setLastLockTimestamp(long lastLockTimestamp) {
+        this.lastLockTimestamp = lastLockTimestamp;
+    }
+
+
+    public boolean isDroped() {
+        return droped;
+    }
+
+
+    public void setDroped(boolean droped) {
+        this.droped = droped;
+    }
+
+
+    public long getLastPullTimestamp() {
+        return lastPullTimestamp;
+    }
+
+
+    public void setLastPullTimestamp(long lastPullTimestamp) {
+        this.lastPullTimestamp = lastPullTimestamp;
+    }
+
+
+    public long getLastConsumeTimestamp() {
+        return lastConsumeTimestamp;
+    }
+
+
+    public void setLastConsumeTimestamp(long lastConsumeTimestamp) {
+        this.lastConsumeTimestamp = lastConsumeTimestamp;
+    }
+
+
+    @Override
+    public String toString() {
+        return "ProcessQueueInfo [commitOffset=" + commitOffset + ", cachedMsgMinOffset="
+                + cachedMsgMinOffset + ", cachedMsgMaxOffset=" + cachedMsgMaxOffset + ", cachedMsgCount="
+                + cachedMsgCount + ", transactionMsgMinOffset=" + transactionMsgMinOffset
+                + ", transactionMsgMaxOffset=" + transactionMsgMaxOffset + ", transactionMsgCount="
+                + transactionMsgCount + ", locked=" + locked + ", tryUnlockTimes=" + tryUnlockTimes
+                + ", lastLockTimestamp=" + UtilAll.timeMillisToHumanString(lastLockTimestamp) + ", droped="
+                + droped + ", lastPullTimestamp=" + UtilAll.timeMillisToHumanString(lastPullTimestamp)
+                + ", lastConsumeTimestamp=" + UtilAll.timeMillisToHumanString(lastConsumeTimestamp) + "]";
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/protocol/body/ProducerConnection.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ProducerConnection.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ProducerConnection.java
new file mode 100644
index 0000000..3d036fc
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ProducerConnection.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.common.protocol.body;
+
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+
+import java.util.HashSet;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class ProducerConnection extends RemotingSerializable {
+    private HashSet<Connection> connectionSet = new HashSet<Connection>();
+
+
+    public HashSet<Connection> getConnectionSet() {
+        return connectionSet;
+    }
+
+
+    public void setConnectionSet(HashSet<Connection> connectionSet) {
+        this.connectionSet = connectionSet;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/protocol/body/QueryConsumeTimeSpanBody.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/QueryConsumeTimeSpanBody.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/QueryConsumeTimeSpanBody.java
new file mode 100644
index 0000000..98279c8
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/QueryConsumeTimeSpanBody.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.common.protocol.body;
+
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+
+import java.util.ArrayList;
+import java.util.List;
+
+
+/**
+ * @author manhong.yqd
+ */
+public class QueryConsumeTimeSpanBody extends RemotingSerializable {
+    List<QueueTimeSpan> consumeTimeSpanSet = new ArrayList<QueueTimeSpan>();
+
+
+    public List<QueueTimeSpan> getConsumeTimeSpanSet() {
+        return consumeTimeSpanSet;
+    }
+
+
+    public void setConsumeTimeSpanSet(List<QueueTimeSpan> consumeTimeSpanSet) {
+        this.consumeTimeSpanSet = consumeTimeSpanSet;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/protocol/body/QueryCorrectionOffsetBody.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/QueryCorrectionOffsetBody.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/QueryCorrectionOffsetBody.java
new file mode 100644
index 0000000..efe491d
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/QueryCorrectionOffsetBody.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.common.protocol.body;
+
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+
+import java.util.HashMap;
+import java.util.Map;
+
+
+/**
+ * @author manhong.yqd
+ */
+public class QueryCorrectionOffsetBody extends RemotingSerializable {
+    private Map<Integer, Long> correctionOffsets = new HashMap<Integer, Long>();
+
+
+    public Map<Integer, Long> getCorrectionOffsets() {
+        return correctionOffsets;
+    }
+
+
+    public void setCorrectionOffsets(Map<Integer, Long> correctionOffsets) {
+        this.correctionOffsets = correctionOffsets;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/protocol/body/QueueTimeSpan.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/QueueTimeSpan.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/QueueTimeSpan.java
new file mode 100644
index 0000000..c959c59
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/QueueTimeSpan.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.common.protocol.body;
+
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.message.MessageQueue;
+
+import java.util.Date;
+
+
+/**
+ * @author manhong.yqd
+ */
+public class QueueTimeSpan {
+    private MessageQueue messageQueue;
+    private long minTimeStamp;
+    private long maxTimeStamp;
+    private long consumeTimeStamp;
+    private long delayTime;
+
+
+    public MessageQueue getMessageQueue() {
+        return messageQueue;
+    }
+
+
+    public void setMessageQueue(MessageQueue messageQueue) {
+        this.messageQueue = messageQueue;
+    }
+
+
+    public long getMinTimeStamp() {
+        return minTimeStamp;
+    }
+
+
+    public void setMinTimeStamp(long minTimeStamp) {
+        this.minTimeStamp = minTimeStamp;
+    }
+
+
+    public long getMaxTimeStamp() {
+        return maxTimeStamp;
+    }
+
+
+    public void setMaxTimeStamp(long maxTimeStamp) {
+        this.maxTimeStamp = maxTimeStamp;
+    }
+
+
+    public long getConsumeTimeStamp() {
+        return consumeTimeStamp;
+    }
+
+
+    public void setConsumeTimeStamp(long consumeTimeStamp) {
+        this.consumeTimeStamp = consumeTimeStamp;
+    }
+
+
+    public String getMinTimeStampStr() {
+        return UtilAll.formatDate(new Date(minTimeStamp), UtilAll.YYYY_MM_DD_HH_MM_SS_SSS);
+    }
+
+
+    public String getMaxTimeStampStr() {
+        return UtilAll.formatDate(new Date(maxTimeStamp), UtilAll.YYYY_MM_DD_HH_MM_SS_SSS);
+    }
+
+
+    public String getConsumeTimeStampStr() {
+        return UtilAll.formatDate(new Date(consumeTimeStamp), UtilAll.YYYY_MM_DD_HH_MM_SS_SSS);
+    }
+
+
+    public long getDelayTime() {
+        return delayTime;
+    }
+
+
+    public void setDelayTime(long delayTime) {
+        this.delayTime = delayTime;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/protocol/body/RegisterBrokerBody.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/RegisterBrokerBody.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/RegisterBrokerBody.java
new file mode 100644
index 0000000..45a4a29
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/RegisterBrokerBody.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.common.protocol.body;
+
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+
+import java.util.ArrayList;
+import java.util.List;
+
+
+public class RegisterBrokerBody extends RemotingSerializable {
+    private TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper();
+    private List<String> filterServerList = new ArrayList<String>();
+
+
+    public TopicConfigSerializeWrapper getTopicConfigSerializeWrapper() {
+        return topicConfigSerializeWrapper;
+    }
+
+
+    public void setTopicConfigSerializeWrapper(TopicConfigSerializeWrapper topicConfigSerializeWrapper) {
+        this.topicConfigSerializeWrapper = topicConfigSerializeWrapper;
+    }
+
+
+    public List<String> getFilterServerList() {
+        return filterServerList;
+    }
+
+
+    public void setFilterServerList(List<String> filterServerList) {
+        this.filterServerList = filterServerList;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/protocol/body/ResetOffsetBody.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ResetOffsetBody.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ResetOffsetBody.java
new file mode 100644
index 0000000..c311436
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ResetOffsetBody.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.common.protocol.body;
+
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+
+import java.util.Map;
+
+
+/**
+ * @author manhong.yqd
+ *
+ */
+public class ResetOffsetBody extends RemotingSerializable {
+    private Map<MessageQueue, Long> offsetTable;
+
+
+    public Map<MessageQueue, Long> getOffsetTable() {
+        return offsetTable;
+    }
+
+
+    public void setOffsetTable(Map<MessageQueue, Long> offsetTable) {
+        this.offsetTable = offsetTable;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/protocol/body/ResetOffsetBodyForC.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ResetOffsetBodyForC.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ResetOffsetBodyForC.java
new file mode 100644
index 0000000..3502605
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ResetOffsetBodyForC.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.common.protocol.body;
+
+import org.apache.rocketmq.common.message.MessageQueueForC;
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+
+import java.util.List;
+
+public class ResetOffsetBodyForC extends RemotingSerializable {
+
+    private List<MessageQueueForC> offsetTable;
+
+
+    public List<MessageQueueForC> getOffsetTable() {
+        return offsetTable;
+    }
+
+
+    public void setOffsetTable(List<MessageQueueForC> offsetTable) {
+        this.offsetTable = offsetTable;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/protocol/body/SubscriptionGroupWrapper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/SubscriptionGroupWrapper.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/SubscriptionGroupWrapper.java
new file mode 100644
index 0000000..ba63926
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/SubscriptionGroupWrapper.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.common.protocol.body;
+
+import org.apache.rocketmq.common.DataVersion;
+import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+
+/**
+ * @author manhong.yqd
+ */
+public class SubscriptionGroupWrapper extends RemotingSerializable {
+    private ConcurrentHashMap<String, SubscriptionGroupConfig> subscriptionGroupTable =
+            new ConcurrentHashMap<String, SubscriptionGroupConfig>(1024);
+    private DataVersion dataVersion = new DataVersion();
+
+
+    public ConcurrentHashMap<String, SubscriptionGroupConfig> getSubscriptionGroupTable() {
+        return subscriptionGroupTable;
+    }
+
+
+    public void setSubscriptionGroupTable(
+            ConcurrentHashMap<String, SubscriptionGroupConfig> subscriptionGroupTable) {
+        this.subscriptionGroupTable = subscriptionGroupTable;
+    }
+
+
+    public DataVersion getDataVersion() {
+        return dataVersion;
+    }
+
+
+    public void setDataVersion(DataVersion dataVersion) {
+        this.dataVersion = dataVersion;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicConfigSerializeWrapper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicConfigSerializeWrapper.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicConfigSerializeWrapper.java
new file mode 100644
index 0000000..f91caa6
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicConfigSerializeWrapper.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.common.protocol.body;
+
+import org.apache.rocketmq.common.DataVersion;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+
+public class TopicConfigSerializeWrapper extends RemotingSerializable {
+    private ConcurrentHashMap<String, TopicConfig> topicConfigTable =
+            new ConcurrentHashMap<String, TopicConfig>();
+    private DataVersion dataVersion = new DataVersion();
+
+
+    public ConcurrentHashMap<String, TopicConfig> getTopicConfigTable() {
+        return topicConfigTable;
+    }
+
+
+    public void setTopicConfigTable(ConcurrentHashMap<String, TopicConfig> topicConfigTable) {
+        this.topicConfigTable = topicConfigTable;
+    }
+
+
+    public DataVersion getDataVersion() {
+        return dataVersion;
+    }
+
+
+    public void setDataVersion(DataVersion dataVersion) {
+        this.dataVersion = dataVersion;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicList.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicList.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicList.java
new file mode 100644
index 0000000..7cc7b30
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicList.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.common.protocol.body;
+
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+
+import java.util.HashSet;
+import java.util.Set;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class TopicList extends RemotingSerializable {
+    private Set<String> topicList = new HashSet<String>();
+    private String brokerAddr;
+
+
+    public Set<String> getTopicList() {
+        return topicList;
+    }
+
+
+    public void setTopicList(Set<String> topicList) {
+        this.topicList = topicList;
+    }
+
+
+    public String getBrokerAddr() {
+        return brokerAddr;
+    }
+
+
+    public void setBrokerAddr(String brokerAddr) {
+        this.brokerAddr = brokerAddr;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/protocol/body/UnlockBatchRequestBody.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/UnlockBatchRequestBody.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/UnlockBatchRequestBody.java
new file mode 100644
index 0000000..66e902c
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/UnlockBatchRequestBody.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.common.protocol.body;
+
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+
+import java.util.HashSet;
+import java.util.Set;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class UnlockBatchRequestBody extends RemotingSerializable {
+    private String consumerGroup;
+    private String clientId;
+    private Set<MessageQueue> mqSet = new HashSet<MessageQueue>();
+
+
+    public String getConsumerGroup() {
+        return consumerGroup;
+    }
+
+
+    public void setConsumerGroup(String consumerGroup) {
+        this.consumerGroup = consumerGroup;
+    }
+
+
+    public String getClientId() {
+        return clientId;
+    }
+
+
+    public void setClientId(String clientId) {
+        this.clientId = clientId;
+    }
+
+
+    public Set<MessageQueue> getMqSet() {
+        return mqSet;
+    }
+
+
+    public void setMqSet(Set<MessageQueue> mqSet) {
+        this.mqSet = mqSet;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/protocol/header/CheckTransactionStateRequestHeader.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/CheckTransactionStateRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/CheckTransactionStateRequestHeader.java
new file mode 100644
index 0000000..5a7da65
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/CheckTransactionStateRequestHeader.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+/**
+ * $Id: EndTransactionRequestHeader.java 1835 2013-05-16 02:00:50Z shijia.wxr $
+ */
+package org.apache.rocketmq.common.protocol.header;
+
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.annotation.CFNotNull;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class CheckTransactionStateRequestHeader implements CommandCustomHeader {
+    @CFNotNull
+    private Long tranStateTableOffset;
+    @CFNotNull
+    private Long commitLogOffset;
+    private String msgId;
+    private String transactionId;
+
+
+    @Override
+    public void checkFields() throws RemotingCommandException {
+    }
+
+
+    public Long getTranStateTableOffset() {
+        return tranStateTableOffset;
+    }
+
+
+    public void setTranStateTableOffset(Long tranStateTableOffset) {
+        this.tranStateTableOffset = tranStateTableOffset;
+    }
+
+
+    public Long getCommitLogOffset() {
+        return commitLogOffset;
+    }
+
+
+    public void setCommitLogOffset(Long commitLogOffset) {
+        this.commitLogOffset = commitLogOffset;
+    }
+
+    public String getMsgId() {
+        return msgId;
+    }
+
+    public void setMsgId(String msgId) {
+        this.msgId = msgId;
+    }
+
+    public String getTransactionId() {
+        return transactionId;
+    }
+
+    public void setTransactionId(String transactionId) {
+        this.transactionId = transactionId;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/protocol/header/CheckTransactionStateResponseHeader.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/CheckTransactionStateResponseHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/CheckTransactionStateResponseHeader.java
new file mode 100644
index 0000000..4f8864e
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/CheckTransactionStateResponseHeader.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+/**
+ * $Id: EndTransactionResponseHeader.java 1835 2013-05-16 02:00:50Z shijia.wxr $
+ */
+package org.apache.rocketmq.common.protocol.header;
+
+import org.apache.rocketmq.common.sysflag.MessageSysFlag;
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.annotation.CFNotNull;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class CheckTransactionStateResponseHeader implements CommandCustomHeader {
+    @CFNotNull
+    private String producerGroup;
+    @CFNotNull
+    private Long tranStateTableOffset;
+    @CFNotNull
+    private Long commitLogOffset;
+    @CFNotNull
+    private Integer commitOrRollback; // TRANSACTION_COMMIT_TYPE
+
+
+    // TRANSACTION_ROLLBACK_TYPE
+
+    @Override
+    public void checkFields() throws RemotingCommandException {
+        if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == this.commitOrRollback) {
+            return;
+        }
+
+        if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == this.commitOrRollback) {
+            return;
+        }
+
+        throw new RemotingCommandException("commitOrRollback field wrong");
+    }
+
+
+    public String getProducerGroup() {
+        return producerGroup;
+    }
+
+
+    public void setProducerGroup(String producerGroup) {
+        this.producerGroup = producerGroup;
+    }
+
+
+    public Long getTranStateTableOffset() {
+        return tranStateTableOffset;
+    }
+
+
+    public void setTranStateTableOffset(Long tranStateTableOffset) {
+        this.tranStateTableOffset = tranStateTableOffset;
+    }
+
+
+    public Long getCommitLogOffset() {
+        return commitLogOffset;
+    }
+
+
+    public void setCommitLogOffset(Long commitLogOffset) {
+        this.commitLogOffset = commitLogOffset;
+    }
+
+
+    public Integer getCommitOrRollback() {
+        return commitOrRollback;
+    }
+
+
+    public void setCommitOrRollback(Integer commitOrRollback) {
+        this.commitOrRollback = commitOrRollback;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/protocol/header/CloneGroupOffsetRequestHeader.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/CloneGroupOffsetRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/CloneGroupOffsetRequestHeader.java
new file mode 100644
index 0000000..50722f3
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/CloneGroupOffsetRequestHeader.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+/**
+ * $Id: DeleteTopicRequestHeader.java 1835 2013-05-16 02:00:50Z shijia.wxr $
+ */
+package org.apache.rocketmq.common.protocol.header;
+
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.annotation.CFNotNull;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+
+
+/**
+ * @author manhong.yqd
+ */
+public class CloneGroupOffsetRequestHeader implements CommandCustomHeader {
+    @CFNotNull
+    private String srcGroup;
+    @CFNotNull
+    private String destGroup;
+    private String topic;
+    private boolean offline;
+
+
+    @Override
+    public void checkFields() throws RemotingCommandException {
+    }
+
+
+    public String getDestGroup() {
+        return destGroup;
+    }
+
+
+    public void setDestGroup(String destGroup) {
+        this.destGroup = destGroup;
+    }
+
+
+    public String getTopic() {
+        return topic;
+    }
+
+
+    public void setTopic(String topic) {
+        this.topic = topic;
+    }
+
+
+    public String getSrcGroup() {
+
+        return srcGroup;
+    }
+
+
+    public void setSrcGroup(String srcGroup) {
+        this.srcGroup = srcGroup;
+    }
+
+
+    public boolean isOffline() {
+        return offline;
+    }
+
+
+    public void setOffline(boolean offline) {
+        this.offline = offline;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/protocol/header/ConsumeMessageDirectlyResultRequestHeader.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/ConsumeMessageDirectlyResultRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/ConsumeMessageDirectlyResultRequestHeader.java
new file mode 100644
index 0000000..aea3092
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/ConsumeMessageDirectlyResultRequestHeader.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.common.protocol.header;
+
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.annotation.CFNotNull;
+import org.apache.rocketmq.remoting.annotation.CFNullable;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+
+
+public class ConsumeMessageDirectlyResultRequestHeader implements CommandCustomHeader {
+    @CFNotNull
+    private String consumerGroup;
+    @CFNullable
+    private String clientId;
+    @CFNullable
+    private String msgId;
+    @CFNullable
+    private String brokerName;
+
+
+    @Override
+    public void checkFields() throws RemotingCommandException {
+    }
+
+
+    public String getConsumerGroup() {
+        return consumerGroup;
+    }
+
+
+    public void setConsumerGroup(String consumerGroup) {
+        this.consumerGroup = consumerGroup;
+    }
+
+
+    public String getBrokerName() {
+        return brokerName;
+    }
+
+
+    public void setBrokerName(String brokerName) {
+        this.brokerName = brokerName;
+    }
+
+
+    public String getClientId() {
+        return clientId;
+    }
+
+
+    public void setClientId(String clientId) {
+        this.clientId = clientId;
+    }
+
+
+    public String getMsgId() {
+        return msgId;
+    }
+
+
+    public void setMsgId(String msgId) {
+        this.msgId = msgId;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/protocol/header/ConsumerSendMsgBackRequestHeader.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/ConsumerSendMsgBackRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/ConsumerSendMsgBackRequestHeader.java
new file mode 100644
index 0000000..64a60b0
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/ConsumerSendMsgBackRequestHeader.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.common.protocol.header;
+
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.annotation.CFNotNull;
+import org.apache.rocketmq.remoting.annotation.CFNullable;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+
+
+/**
+ * @author shijia.wxr
+ *
+ */
+public class ConsumerSendMsgBackRequestHeader implements CommandCustomHeader {
+    @CFNotNull
+    private Long offset;
+    @CFNotNull
+    private String group;
+    @CFNotNull
+    private Integer delayLevel;
+    private String originMsgId;
+    private String originTopic;
+    @CFNullable
+    private boolean unitMode = false;
+    private Integer maxReconsumeTimes;
+
+
+    @Override
+    public void checkFields() throws RemotingCommandException {
+
+    }
+
+
+    public Long getOffset() {
+        return offset;
+    }
+
+
+    public void setOffset(Long offset) {
+        this.offset = offset;
+    }
+
+
+    public String getGroup() {
+        return group;
+    }
+
+
+    public void setGroup(String group) {
+        this.group = group;
+    }
+
+
+    public Integer getDelayLevel() {
+        return delayLevel;
+    }
+
+
+    public void setDelayLevel(Integer delayLevel) {
+        this.delayLevel = delayLevel;
+    }
+
+
+    public String getOriginMsgId() {
+        return originMsgId;
+    }
+
+
+    public void setOriginMsgId(String originMsgId) {
+        this.originMsgId = originMsgId;
+    }
+
+
+    public String getOriginTopic() {
+        return originTopic;
+    }
+
+
+    public void setOriginTopic(String originTopic) {
+        this.originTopic = originTopic;
+    }
+
+
+    public boolean isUnitMode() {
+        return unitMode;
+    }
+
+
+    public void setUnitMode(boolean unitMode) {
+        this.unitMode = unitMode;
+    }
+
+
+    public Integer getMaxReconsumeTimes() {
+        return maxReconsumeTimes;
+    }
+
+
+    public void setMaxReconsumeTimes(final Integer maxReconsumeTimes) {
+        this.maxReconsumeTimes = maxReconsumeTimes;
+    }
+
+
+    @Override
+    public String toString() {
+        return "ConsumerSendMsgBackRequestHeader [group=" + group + ", originTopic=" + originTopic + ", originMsgId=" + originMsgId
+                + ", delayLevel=" + delayLevel + ", unitMode=" + unitMode + ", maxReconsumeTimes=" + maxReconsumeTimes + "]";
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/protocol/header/CreateTopicRequestHeader.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/CreateTopicRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/CreateTopicRequestHeader.java
new file mode 100644
index 0000000..6eb85b0
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/CreateTopicRequestHeader.java
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+/**
+ * $Id: CreateTopicRequestHeader.java 1835 2013-05-16 02:00:50Z shijia.wxr $
+ */
+package org.apache.rocketmq.common.protocol.header;
+
+import org.apache.rocketmq.common.TopicFilterType;
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.annotation.CFNotNull;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class CreateTopicRequestHeader implements CommandCustomHeader {
+    @CFNotNull
+    private String topic;
+    @CFNotNull
+    private String defaultTopic;
+    @CFNotNull
+    private Integer readQueueNums;
+    @CFNotNull
+    private Integer writeQueueNums;
+    @CFNotNull
+    private Integer perm;
+    @CFNotNull
+    private String topicFilterType;
+    private Integer topicSysFlag;
+    @CFNotNull
+    private Boolean order = false;
+
+
+    @Override
+    public void checkFields() throws RemotingCommandException {
+        try {
+            TopicFilterType.valueOf(this.topicFilterType);
+        } catch (Exception e) {
+            throw new RemotingCommandException("topicFilterType = [" + topicFilterType + "] value invalid", e);
+        }
+    }
+
+
+    public TopicFilterType getTopicFilterTypeEnum() {
+        return TopicFilterType.valueOf(this.topicFilterType);
+    }
+
+
+    public String getTopic() {
+        return topic;
+    }
+
+
+    public void setTopic(String topic) {
+        this.topic = topic;
+    }
+
+
+    public String getDefaultTopic() {
+        return defaultTopic;
+    }
+
+
+    public void setDefaultTopic(String defaultTopic) {
+        this.defaultTopic = defaultTopic;
+    }
+
+
+    public Integer getReadQueueNums() {
+        return readQueueNums;
+    }
+
+
+    public void setReadQueueNums(Integer readQueueNums) {
+        this.readQueueNums = readQueueNums;
+    }
+
+
+    public Integer getWriteQueueNums() {
+        return writeQueueNums;
+    }
+
+
+    public void setWriteQueueNums(Integer writeQueueNums) {
+        this.writeQueueNums = writeQueueNums;
+    }
+
+
+    public Integer getPerm() {
+        return perm;
+    }
+
+
+    public void setPerm(Integer perm) {
+        this.perm = perm;
+    }
+
+
+    public String getTopicFilterType() {
+        return topicFilterType;
+    }
+
+
+    public void setTopicFilterType(String topicFilterType) {
+        this.topicFilterType = topicFilterType;
+    }
+
+
+    public Integer getTopicSysFlag() {
+        return topicSysFlag;
+    }
+
+
+    public void setTopicSysFlag(Integer topicSysFlag) {
+        this.topicSysFlag = topicSysFlag;
+    }
+
+
+    public Boolean getOrder() {
+        return order;
+    }
+
+
+    public void setOrder(Boolean order) {
+        this.order = order;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/protocol/header/DeleteSubscriptionGroupRequestHeader.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/DeleteSubscriptionGroupRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/DeleteSubscriptionGroupRequestHeader.java
new file mode 100644
index 0000000..783b37c
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/DeleteSubscriptionGroupRequestHeader.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.common.protocol.header;
+
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.annotation.CFNotNull;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+
+
+/**
+ * @author manhong.yqd
+ */
+public class DeleteSubscriptionGroupRequestHeader implements CommandCustomHeader {
+    @CFNotNull
+    private String groupName;
+
+
+    @Override
+    public void checkFields() throws RemotingCommandException {
+    }
+
+
+    public String getGroupName() {
+        return groupName;
+    }
+
+
+    public void setGroupName(String groupName) {
+        this.groupName = groupName;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/protocol/header/DeleteTopicRequestHeader.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/DeleteTopicRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/DeleteTopicRequestHeader.java
new file mode 100644
index 0000000..cc0c324
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/DeleteTopicRequestHeader.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+/**
+ * $Id: DeleteTopicRequestHeader.java 1835 2013-05-16 02:00:50Z shijia.wxr $
+ */
+package org.apache.rocketmq.common.protocol.header;
+
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.annotation.CFNotNull;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class DeleteTopicRequestHeader implements CommandCustomHeader {
+    @CFNotNull
+    private String topic;
+
+
+    @Override
+    public void checkFields() throws RemotingCommandException {
+    }
+
+
+    public String getTopic() {
+        return topic;
+    }
+
+
+    public void setTopic(String topic) {
+        this.topic = topic;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/protocol/header/EndTransactionRequestHeader.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/EndTransactionRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/EndTransactionRequestHeader.java
new file mode 100644
index 0000000..ce9f170
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/EndTransactionRequestHeader.java
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+/**
+ * $Id: EndTransactionRequestHeader.java 1835 2013-05-16 02:00:50Z shijia.wxr $
+ */
+package org.apache.rocketmq.common.protocol.header;
+
+import org.apache.rocketmq.common.sysflag.MessageSysFlag;
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.annotation.CFNotNull;
+import org.apache.rocketmq.remoting.annotation.CFNullable;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class EndTransactionRequestHeader implements CommandCustomHeader {
+    @CFNotNull
+    private String producerGroup;
+    @CFNotNull
+    private Long tranStateTableOffset;
+    @CFNotNull
+    private Long commitLogOffset;
+    @CFNotNull
+    private Integer commitOrRollback; // TRANSACTION_COMMIT_TYPE
+    // TRANSACTION_ROLLBACK_TYPE
+    // TRANSACTION_NOT_TYPE
+
+    @CFNullable
+    private Boolean fromTransactionCheck = false;
+
+    @CFNotNull
+    private String msgId;
+
+    private String transactionId;
+
+    @Override
+    public void checkFields() throws RemotingCommandException {
+        if (MessageSysFlag.TRANSACTION_NOT_TYPE == this.commitOrRollback) {
+            return;
+        }
+
+        if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == this.commitOrRollback) {
+            return;
+        }
+
+        if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == this.commitOrRollback) {
+            return;
+        }
+
+        throw new RemotingCommandException("commitOrRollback field wrong");
+    }
+
+
+    public String getProducerGroup() {
+        return producerGroup;
+    }
+
+
+    public void setProducerGroup(String producerGroup) {
+        this.producerGroup = producerGroup;
+    }
+
+
+    public Long getTranStateTableOffset() {
+        return tranStateTableOffset;
+    }
+
+
+    public void setTranStateTableOffset(Long tranStateTableOffset) {
+        this.tranStateTableOffset = tranStateTableOffset;
+    }
+
+
+    public Long getCommitLogOffset() {
+        return commitLogOffset;
+    }
+
+
+    public void setCommitLogOffset(Long commitLogOffset) {
+        this.commitLogOffset = commitLogOffset;
+    }
+
+
+    public Integer getCommitOrRollback() {
+        return commitOrRollback;
+    }
+
+
+    public void setCommitOrRollback(Integer commitOrRollback) {
+        this.commitOrRollback = commitOrRollback;
+    }
+
+
+    public Boolean getFromTransactionCheck() {
+        return fromTransactionCheck;
+    }
+
+
+    public void setFromTransactionCheck(Boolean fromTransactionCheck) {
+        this.fromTransactionCheck = fromTransactionCheck;
+    }
+
+
+    public String getMsgId() {
+        return msgId;
+    }
+
+
+    public void setMsgId(String msgId) {
+        this.msgId = msgId;
+    }
+
+    public String getTransactionId() {
+        return transactionId;
+    }
+
+    public void setTransactionId(String transactionId) {
+        this.transactionId = transactionId;
+    }
+
+    @Override
+    public String toString() {
+        return "EndTransactionRequestHeader [producerGroup=" + producerGroup + ", tranStateTableOffset="
+                + tranStateTableOffset + ", commitLogOffset=" + commitLogOffset + ", commitOrRollback="
+                + commitOrRollback + ", fromTransactionCheck=" + fromTransactionCheck + ", msgId=" + msgId
+                + "]";
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/protocol/header/EndTransactionResponseHeader.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/EndTransactionResponseHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/EndTransactionResponseHeader.java
new file mode 100644
index 0000000..eb28b6a
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/EndTransactionResponseHeader.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+/**
+ * $Id: EndTransactionResponseHeader.java 1835 2013-05-16 02:00:50Z shijia.wxr $
+ */
+package org.apache.rocketmq.common.protocol.header;
+
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class EndTransactionResponseHeader implements CommandCustomHeader {
+
+    @Override
+    public void checkFields() throws RemotingCommandException {
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetAllTopicConfigResponseHeader.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetAllTopicConfigResponseHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetAllTopicConfigResponseHeader.java
new file mode 100644
index 0000000..4a39a25
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetAllTopicConfigResponseHeader.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+/**
+ * $Id: GetAllTopicConfigResponseHeader.java 1835 2013-05-16 02:00:50Z shijia.wxr $
+ */
+package org.apache.rocketmq.common.protocol.header;
+
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class GetAllTopicConfigResponseHeader implements CommandCustomHeader {
+
+    @Override
+    public void checkFields() throws RemotingCommandException {
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetBrokerConfigResponseHeader.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetBrokerConfigResponseHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetBrokerConfigResponseHeader.java
new file mode 100644
index 0000000..67cc8eb
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetBrokerConfigResponseHeader.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+/**
+ * $Id: GetBrokerConfigResponseHeader.java 1835 2013-05-16 02:00:50Z shijia.wxr $
+ */
+package org.apache.rocketmq.common.protocol.header;
+
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.annotation.CFNotNull;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class GetBrokerConfigResponseHeader implements CommandCustomHeader {
+    @CFNotNull
+    private String version;
+
+
+    @Override
+    public void checkFields() throws RemotingCommandException {
+    }
+
+
+    public String getVersion() {
+        return version;
+    }
+
+
+    public void setVersion(String version) {
+        this.version = version;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetConsumeStatsInBrokerHeader.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetConsumeStatsInBrokerHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetConsumeStatsInBrokerHeader.java
new file mode 100644
index 0000000..b17a558
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetConsumeStatsInBrokerHeader.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.common.protocol.header;
+
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.annotation.CFNotNull;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+
+
+public class GetConsumeStatsInBrokerHeader implements CommandCustomHeader {
+    @CFNotNull
+    private boolean isOrder;
+
+    @Override
+    public void checkFields() throws RemotingCommandException {
+
+    }
+
+    public boolean isOrder() {
+        return isOrder;
+    }
+
+    public void setIsOrder(boolean isOrder) {
+        this.isOrder = isOrder;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetConsumeStatsRequestHeader.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetConsumeStatsRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetConsumeStatsRequestHeader.java
new file mode 100644
index 0000000..2a84db6
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetConsumeStatsRequestHeader.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.common.protocol.header;
+
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.annotation.CFNotNull;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class GetConsumeStatsRequestHeader implements CommandCustomHeader {
+    @CFNotNull
+    private String consumerGroup;
+    private String topic;
+
+
+    @Override
+    public void checkFields() throws RemotingCommandException {
+        // TODO Auto-generated method stub
+
+    }
+
+
+    public String getConsumerGroup() {
+        return consumerGroup;
+    }
+
+
+    public void setConsumerGroup(String consumerGroup) {
+        this.consumerGroup = consumerGroup;
+    }
+
+
+    public String getTopic() {
+        return topic;
+    }
+
+
+    public void setTopic(String topic) {
+        this.topic = topic;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetConsumerConnectionListRequestHeader.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetConsumerConnectionListRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetConsumerConnectionListRequestHeader.java
new file mode 100644
index 0000000..e49f775
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetConsumerConnectionListRequestHeader.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.common.protocol.header;
+
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.annotation.CFNotNull;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+
+
+/**
+ *
+ * @author shijia.wxr
+ *
+ */
+public class GetConsumerConnectionListRequestHeader implements CommandCustomHeader {
+    @CFNotNull
+    private String consumerGroup;
+
+
+    @Override
+    public void checkFields() throws RemotingCommandException {
+        // To change body of implemented methods use File | Settings | File
+        // Templates.
+    }
+
+
+    public String getConsumerGroup() {
+        return consumerGroup;
+    }
+
+
+    public void setConsumerGroup(String consumerGroup) {
+        this.consumerGroup = consumerGroup;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetConsumerListByGroupRequestHeader.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetConsumerListByGroupRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetConsumerListByGroupRequestHeader.java
new file mode 100644
index 0000000..45d3a2c
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetConsumerListByGroupRequestHeader.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.common.protocol.header;
+
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.annotation.CFNotNull;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class GetConsumerListByGroupRequestHeader implements CommandCustomHeader {
+    @CFNotNull
+    private String consumerGroup;
+
+
+    @Override
+    public void checkFields() throws RemotingCommandException {
+    }
+
+
+    public String getConsumerGroup() {
+        return consumerGroup;
+    }
+
+
+    public void setConsumerGroup(String consumerGroup) {
+        this.consumerGroup = consumerGroup;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetConsumerListByGroupResponseBody.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetConsumerListByGroupResponseBody.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetConsumerListByGroupResponseBody.java
new file mode 100644
index 0000000..6563ed5
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetConsumerListByGroupResponseBody.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.common.protocol.header;
+
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+
+import java.util.List;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class GetConsumerListByGroupResponseBody extends RemotingSerializable {
+    private List<String> consumerIdList;
+
+
+    public List<String> getConsumerIdList() {
+        return consumerIdList;
+    }
+
+
+    public void setConsumerIdList(List<String> consumerIdList) {
+        this.consumerIdList = consumerIdList;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetConsumerListByGroupResponseHeader.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetConsumerListByGroupResponseHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetConsumerListByGroupResponseHeader.java
new file mode 100644
index 0000000..8719826
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetConsumerListByGroupResponseHeader.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.common.protocol.header;
+
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class GetConsumerListByGroupResponseHeader implements CommandCustomHeader {
+
+    @Override
+    public void checkFields() throws RemotingCommandException {
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetConsumerRunningInfoRequestHeader.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetConsumerRunningInfoRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetConsumerRunningInfoRequestHeader.java
new file mode 100644
index 0000000..fd1e589
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetConsumerRunningInfoRequestHeader.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.common.protocol.header;
+
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.annotation.CFNotNull;
+import org.apache.rocketmq.remoting.annotation.CFNullable;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class GetConsumerRunningInfoRequestHeader implements CommandCustomHeader {
+    @CFNotNull
+    private String consumerGroup;
+    @CFNotNull
+    private String clientId;
+    @CFNullable
+    private boolean jstackEnable;
+
+
+    @Override
+    public void checkFields() throws RemotingCommandException {
+    }
+
+
+    public String getConsumerGroup() {
+        return consumerGroup;
+    }
+
+
+    public void setConsumerGroup(String consumerGroup) {
+        this.consumerGroup = consumerGroup;
+    }
+
+
+    public String getClientId() {
+        return clientId;
+    }
+
+
+    public void setClientId(String clientId) {
+        this.clientId = clientId;
+    }
+
+
+    public boolean isJstackEnable() {
+        return jstackEnable;
+    }
+
+
+    public void setJstackEnable(boolean jstackEnable) {
+        this.jstackEnable = jstackEnable;
+    }
+}