You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2016/12/19 09:40:39 UTC
[22/43] incubator-rocketmq git commit: Finish code dump. Reviewed by:
@yukon @vongosling @stevenschew @vintagewang @lollipop @zander
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ConsumerConnection.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ConsumerConnection.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ConsumerConnection.java
new file mode 100644
index 0000000..f74c6fc
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ConsumerConnection.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.rocketmq.common.protocol.body;
+
+import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
+import com.alibaba.rocketmq.common.protocol.heartbeat.ConsumeType;
+import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;
+import com.alibaba.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable;
+
+import java.util.HashSet;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class ConsumerConnection extends RemotingSerializable {
+ private HashSet<Connection> connectionSet = new HashSet<Connection>();
+ private ConcurrentHashMap<String/* Topic */, SubscriptionData> subscriptionTable =
+ new ConcurrentHashMap<String, SubscriptionData>();
+ private ConsumeType consumeType;
+ private MessageModel messageModel;
+ private ConsumeFromWhere consumeFromWhere;
+
+
+ public int computeMinVersion() {
+ int minVersion = Integer.MAX_VALUE;
+ for (Connection c : this.connectionSet) {
+ if (c.getVersion() < minVersion) {
+ minVersion = c.getVersion();
+ }
+ }
+
+ return minVersion;
+ }
+
+
+ public HashSet<Connection> getConnectionSet() {
+ return connectionSet;
+ }
+
+
+ public void setConnectionSet(HashSet<Connection> connectionSet) {
+ this.connectionSet = connectionSet;
+ }
+
+
+ public ConcurrentHashMap<String, SubscriptionData> getSubscriptionTable() {
+ return subscriptionTable;
+ }
+
+
+ public void setSubscriptionTable(ConcurrentHashMap<String, SubscriptionData> subscriptionTable) {
+ this.subscriptionTable = subscriptionTable;
+ }
+
+
+ public ConsumeType getConsumeType() {
+ return consumeType;
+ }
+
+
+ public void setConsumeType(ConsumeType consumeType) {
+ this.consumeType = consumeType;
+ }
+
+
+ public MessageModel getMessageModel() {
+ return messageModel;
+ }
+
+
+ public void setMessageModel(MessageModel messageModel) {
+ this.messageModel = messageModel;
+ }
+
+
+ public ConsumeFromWhere getConsumeFromWhere() {
+ return consumeFromWhere;
+ }
+
+
+ public void setConsumeFromWhere(ConsumeFromWhere consumeFromWhere) {
+ this.consumeFromWhere = consumeFromWhere;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ConsumerOffsetSerializeWrapper.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ConsumerOffsetSerializeWrapper.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ConsumerOffsetSerializeWrapper.java
new file mode 100644
index 0000000..7af4e7a
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ConsumerOffsetSerializeWrapper.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.rocketmq.common.protocol.body;
+
+import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+
+/**
+ * @author manhong.yqd
+ */
+public class ConsumerOffsetSerializeWrapper extends RemotingSerializable {
+ private ConcurrentHashMap<String/* topic@group */, ConcurrentHashMap<Integer, Long>> offsetTable =
+ new ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>>(512);
+
+
+ public ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>> getOffsetTable() {
+ return offsetTable;
+ }
+
+
+ public void setOffsetTable(ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>> offsetTable) {
+ this.offsetTable = offsetTable;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ConsumerRunningInfo.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ConsumerRunningInfo.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ConsumerRunningInfo.java
new file mode 100644
index 0000000..56babc2
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ConsumerRunningInfo.java
@@ -0,0 +1,315 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.rocketmq.common.protocol.body;
+
+import com.alibaba.rocketmq.common.message.MessageQueue;
+import com.alibaba.rocketmq.common.protocol.heartbeat.ConsumeType;
+import com.alibaba.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable;
+
+import java.util.*;
+import java.util.Map.Entry;
+
+public class ConsumerRunningInfo extends RemotingSerializable {
+ public static final String PROP_NAMESERVER_ADDR = "PROP_NAMESERVER_ADDR";
+ public static final String PROP_THREADPOOL_CORE_SIZE = "PROP_THREADPOOL_CORE_SIZE";
+ public static final String PROP_CONSUME_ORDERLY = "PROP_CONSUMEORDERLY";
+ public static final String PROP_CONSUME_TYPE = "PROP_CONSUME_TYPE";
+ public static final String PROP_CLIENT_VERSION = "PROP_CLIENT_VERSION";
+ public static final String PROP_CONSUMER_START_TIMESTAMP = "PROP_CONSUMER_START_TIMESTAMP";
+
+
+ private Properties properties = new Properties();
+
+ private TreeSet<SubscriptionData> subscriptionSet = new TreeSet<SubscriptionData>();
+
+ private TreeMap<MessageQueue, ProcessQueueInfo> mqTable = new TreeMap<MessageQueue, ProcessQueueInfo>();
+
+ private TreeMap<String/* Topic */, ConsumeStatus> statusTable = new TreeMap<String, ConsumeStatus>();
+
+ private String jstack;
+
+ public static boolean analyzeSubscription(final TreeMap<String/* clientId */, ConsumerRunningInfo> criTable) {
+ ConsumerRunningInfo prev = criTable.firstEntry().getValue();
+
+ boolean push = false;
+ {
+ String property = prev.getProperties().getProperty(ConsumerRunningInfo.PROP_CONSUME_TYPE);
+
+ if (property == null) {
+ property = ((ConsumeType) prev.getProperties().get(ConsumerRunningInfo.PROP_CONSUME_TYPE)).name();
+ }
+ push = ConsumeType.valueOf(property) == ConsumeType.CONSUME_PASSIVELY;
+ }
+
+ boolean startForAWhile = false;
+ {
+
+ String property = prev.getProperties().getProperty(ConsumerRunningInfo.PROP_CONSUMER_START_TIMESTAMP);
+ if (property == null) {
+ property = String.valueOf(prev.getProperties().get(ConsumerRunningInfo.PROP_CONSUMER_START_TIMESTAMP));
+ }
+ startForAWhile = (System.currentTimeMillis() - Long.parseLong(property)) > (1000 * 60 * 2);
+ }
+
+ if (push && startForAWhile) {
+
+ {
+ Iterator<Entry<String, ConsumerRunningInfo>> it = criTable.entrySet().iterator();
+ while (it.hasNext()) {
+ Entry<String, ConsumerRunningInfo> next = it.next();
+ ConsumerRunningInfo current = next.getValue();
+ boolean equals = current.getSubscriptionSet().equals(prev.getSubscriptionSet());
+
+ if (!equals) {
+ // Different subscription in the same group of consumer
+ return false;
+ }
+
+ prev = next.getValue();
+ }
+
+ if (prev != null) {
+
+ if (prev.getSubscriptionSet().isEmpty()) {
+ // Subscription empty!
+ return false;
+ }
+ }
+ }
+ }
+
+ return true;
+ }
+
+ public Properties getProperties() {
+ return properties;
+ }
+
+ public void setProperties(Properties properties) {
+ this.properties = properties;
+ }
+
+ public TreeSet<SubscriptionData> getSubscriptionSet() {
+ return subscriptionSet;
+ }
+
+ public void setSubscriptionSet(TreeSet<SubscriptionData> subscriptionSet) {
+ this.subscriptionSet = subscriptionSet;
+ }
+
+ public static boolean analyzeRebalance(final TreeMap<String/* clientId */, ConsumerRunningInfo> criTable) {
+ return true;
+ }
+
+ public static String analyzeProcessQueue(final String clientId, ConsumerRunningInfo info) {
+ StringBuilder sb = new StringBuilder();
+ boolean push = false;
+ {
+ String property = info.getProperties().getProperty(ConsumerRunningInfo.PROP_CONSUME_TYPE);
+
+ if (property == null) {
+ property = ((ConsumeType) info.getProperties().get(ConsumerRunningInfo.PROP_CONSUME_TYPE)).name();
+ }
+ push = ConsumeType.valueOf(property) == ConsumeType.CONSUME_PASSIVELY;
+ }
+
+ boolean orderMsg = false;
+ {
+ String property = info.getProperties().getProperty(ConsumerRunningInfo.PROP_CONSUME_ORDERLY);
+ orderMsg = Boolean.parseBoolean(property);
+ }
+
+ if (push) {
+ Iterator<Entry<MessageQueue, ProcessQueueInfo>> it = info.getMqTable().entrySet().iterator();
+ while (it.hasNext()) {
+ Entry<MessageQueue, ProcessQueueInfo> next = it.next();
+ MessageQueue mq = next.getKey();
+ ProcessQueueInfo pq = next.getValue();
+
+
+ if (orderMsg) {
+
+ if (!pq.isLocked()) {
+ sb.append(String.format("%s %s can't lock for a while, %dms%n", //
+ clientId, //
+ mq, //
+ System.currentTimeMillis() - pq.getLastLockTimestamp()));
+ } else {
+ if (pq.isDroped() && (pq.getTryUnlockTimes() > 0)) {
+ sb.append(String.format("%s %s unlock %d times, still failed%n", //
+ clientId, //
+ mq, //
+ pq.getTryUnlockTimes()));
+ }
+ }
+
+
+ } else {
+ long diff = System.currentTimeMillis() - pq.getLastConsumeTimestamp();
+
+ if (diff > (1000 * 60) && pq.getCachedMsgCount() > 0) {
+ sb.append(String.format("%s %s can't consume for a while, maybe blocked, %dms%n", //
+ clientId, //
+ mq, //
+ diff));
+ }
+ }
+ }
+ }
+
+ return sb.toString();
+ }
+
+ public TreeMap<MessageQueue, ProcessQueueInfo> getMqTable() {
+ return mqTable;
+ }
+
+ public void setMqTable(TreeMap<MessageQueue, ProcessQueueInfo> mqTable) {
+ this.mqTable = mqTable;
+ }
+
+ public TreeMap<String, ConsumeStatus> getStatusTable() {
+ return statusTable;
+ }
+
+ public void setStatusTable(TreeMap<String, ConsumeStatus> statusTable) {
+ this.statusTable = statusTable;
+ }
+
+ public String formatString() {
+ StringBuilder sb = new StringBuilder();
+
+ {
+ sb.append("#Consumer Properties#\n");
+ Iterator<Entry<Object, Object>> it = this.properties.entrySet().iterator();
+ while (it.hasNext()) {
+ Entry<Object, Object> next = it.next();
+ String item = String.format("%-40s: %s%n", next.getKey().toString(), next.getValue().toString());
+ sb.append(item);
+ }
+ }
+
+ {
+ sb.append("\n\n#Consumer Subscription#\n");
+
+ Iterator<SubscriptionData> it = this.subscriptionSet.iterator();
+ int i = 0;
+ while (it.hasNext()) {
+ SubscriptionData next = it.next();
+ String item = String.format("%03d Topic: %-40s ClassFilter: %-8s SubExpression: %s%n", //
+ ++i, //
+ next.getTopic(), //
+ next.isClassFilterMode(), //
+ next.getSubString());
+
+ sb.append(item);
+ }
+ }
+
+ {
+ sb.append("\n\n#Consumer Offset#\n");
+ sb.append(String.format("%-32s %-32s %-4s %-20s%n", //
+ "#Topic", //
+ "#Broker Name", //
+ "#QID", //
+ "#Consumer Offset"//
+ ));
+
+ Iterator<Entry<MessageQueue, ProcessQueueInfo>> it = this.mqTable.entrySet().iterator();
+ while (it.hasNext()) {
+ Entry<MessageQueue, ProcessQueueInfo> next = it.next();
+ String item = String.format("%-32s %-32s %-4d %-20d%n", //
+ next.getKey().getTopic(), //
+ next.getKey().getBrokerName(), //
+ next.getKey().getQueueId(), //
+ next.getValue().getCommitOffset());
+
+ sb.append(item);
+ }
+ }
+
+ {
+ sb.append("\n\n#Consumer MQ Detail#\n");
+ sb.append(String.format("%-32s %-32s %-4s %-20s%n", //
+ "#Topic", //
+ "#Broker Name", //
+ "#QID", //
+ "#ProcessQueueInfo"//
+ ));
+
+ Iterator<Entry<MessageQueue, ProcessQueueInfo>> it = this.mqTable.entrySet().iterator();
+ while (it.hasNext()) {
+ Entry<MessageQueue, ProcessQueueInfo> next = it.next();
+ String item = String.format("%-32s %-32s %-4d %s%n", //
+ next.getKey().getTopic(), //
+ next.getKey().getBrokerName(), //
+ next.getKey().getQueueId(), //
+ next.getValue().toString());
+
+ sb.append(item);
+ }
+ }
+
+ {
+ sb.append("\n\n#Consumer RT&TPS#\n");
+ sb.append(String.format("%-32s %14s %14s %14s %14s %18s %25s%n", //
+ "#Topic", //
+ "#Pull RT", //
+ "#Pull TPS", //
+ "#Consume RT", //
+ "#ConsumeOK TPS", //
+ "#ConsumeFailed TPS", //
+ "#ConsumeFailedMsgsInHour"//
+ ));
+
+ Iterator<Entry<String, ConsumeStatus>> it = this.statusTable.entrySet().iterator();
+ while (it.hasNext()) {
+ Entry<String, ConsumeStatus> next = it.next();
+ String item = String.format("%-32s %14.2f %14.2f %14.2f %14.2f %18.2f %25d%n", //
+ next.getKey(), //
+ next.getValue().getPullRT(), //
+ next.getValue().getPullTPS(), //
+ next.getValue().getConsumeRT(), //
+ next.getValue().getConsumeOKTPS(), //
+ next.getValue().getConsumeFailedTPS(), //
+ next.getValue().getConsumeFailedMsgs()//
+ );
+
+ sb.append(item);
+ }
+ }
+
+ if (this.jstack != null) {
+ sb.append("\n\n#Consumer jstack#\n");
+ sb.append(this.jstack);
+ }
+
+ return sb.toString();
+ }
+
+ public String getJstack() {
+ return jstack;
+ }
+
+
+ public void setJstack(String jstack) {
+ this.jstack = jstack;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/GetConsumerStatusBody.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/GetConsumerStatusBody.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/GetConsumerStatusBody.java
new file mode 100644
index 0000000..ca84f21
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/GetConsumerStatusBody.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.rocketmq.common.protocol.body;
+
+import com.alibaba.rocketmq.common.message.MessageQueue;
+import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable;
+
+import java.util.HashMap;
+import java.util.Map;
+
+
+/**
+ * @author manhong.yqd
+ */
+@Deprecated
+public class GetConsumerStatusBody extends RemotingSerializable {
+ private Map<MessageQueue, Long> messageQueueTable = new HashMap<MessageQueue, Long>();
+ private Map<String, Map<MessageQueue, Long>> consumerTable =
+ new HashMap<String, Map<MessageQueue, Long>>();
+
+
+ public Map<MessageQueue, Long> getMessageQueueTable() {
+ return messageQueueTable;
+ }
+
+
+ public void setMessageQueueTable(Map<MessageQueue, Long> messageQueueTable) {
+ this.messageQueueTable = messageQueueTable;
+ }
+
+
+ public Map<String, Map<MessageQueue, Long>> getConsumerTable() {
+ return consumerTable;
+ }
+
+
+ public void setConsumerTable(Map<String, Map<MessageQueue, Long>> consumerTable) {
+ this.consumerTable = consumerTable;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/GroupList.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/GroupList.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/GroupList.java
new file mode 100644
index 0000000..9f7e500
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/GroupList.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.common.protocol.body;
+
+import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable;
+
+import java.util.HashSet;
+
+
+/**
+ * @author shijia.wxr
+ *
+ */
+public class GroupList extends RemotingSerializable {
+ private HashSet<String> groupList = new HashSet<String>();
+
+
+ public HashSet<String> getGroupList() {
+ return groupList;
+ }
+
+
+ public void setGroupList(HashSet<String> groupList) {
+ this.groupList = groupList;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/KVTable.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/KVTable.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/KVTable.java
new file mode 100644
index 0000000..41cfcb8
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/KVTable.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.common.protocol.body;
+
+import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable;
+
+import java.util.HashMap;
+
+
+/**
+ * @author shijia.wxr
+ *
+ */
+public class KVTable extends RemotingSerializable {
+ private HashMap<String, String> table = new HashMap<String, String>();
+
+
+ public HashMap<String, String> getTable() {
+ return table;
+ }
+
+
+ public void setTable(HashMap<String, String> table) {
+ this.table = table;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/LockBatchRequestBody.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/LockBatchRequestBody.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/LockBatchRequestBody.java
new file mode 100644
index 0000000..992f656
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/LockBatchRequestBody.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.rocketmq.common.protocol.body;
+
+import com.alibaba.rocketmq.common.message.MessageQueue;
+import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable;
+
+import java.util.HashSet;
+import java.util.Set;
+
+
+/**
+ * @author shijia.wxr
+ *
+ */
+public class LockBatchRequestBody extends RemotingSerializable {
+ private String consumerGroup;
+ private String clientId;
+ private Set<MessageQueue> mqSet = new HashSet<MessageQueue>();
+
+
+ public String getConsumerGroup() {
+ return consumerGroup;
+ }
+
+
+ public void setConsumerGroup(String consumerGroup) {
+ this.consumerGroup = consumerGroup;
+ }
+
+
+ public String getClientId() {
+ return clientId;
+ }
+
+
+ public void setClientId(String clientId) {
+ this.clientId = clientId;
+ }
+
+
+ public Set<MessageQueue> getMqSet() {
+ return mqSet;
+ }
+
+
+ public void setMqSet(Set<MessageQueue> mqSet) {
+ this.mqSet = mqSet;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/LockBatchResponseBody.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/LockBatchResponseBody.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/LockBatchResponseBody.java
new file mode 100644
index 0000000..12f6c76
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/LockBatchResponseBody.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.rocketmq.common.protocol.body;
+
+import com.alibaba.rocketmq.common.message.MessageQueue;
+import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable;
+
+import java.util.HashSet;
+import java.util.Set;
+
+
+/**
+ * @author shijia.wxr
+ *
+ */
+public class LockBatchResponseBody extends RemotingSerializable {
+
+ private Set<MessageQueue> lockOKMQSet = new HashSet<MessageQueue>();
+
+
+ public Set<MessageQueue> getLockOKMQSet() {
+ return lockOKMQSet;
+ }
+
+
+ public void setLockOKMQSet(Set<MessageQueue> lockOKMQSet) {
+ this.lockOKMQSet = lockOKMQSet;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ProcessQueueInfo.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ProcessQueueInfo.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ProcessQueueInfo.java
new file mode 100644
index 0000000..6c17443
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ProcessQueueInfo.java
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.rocketmq.common.protocol.body;
+
+import com.alibaba.rocketmq.common.UtilAll;
+
+
+public class ProcessQueueInfo {
+ private long commitOffset;
+
+ private long cachedMsgMinOffset;
+ private long cachedMsgMaxOffset;
+ private int cachedMsgCount;
+
+ private long transactionMsgMinOffset;
+ private long transactionMsgMaxOffset;
+ private int transactionMsgCount;
+
+ private boolean locked;
+ private long tryUnlockTimes;
+ private long lastLockTimestamp;
+
+ private boolean droped;
+ private long lastPullTimestamp;
+ private long lastConsumeTimestamp;
+
+
+ public long getCommitOffset() {
+ return commitOffset;
+ }
+
+
+ public void setCommitOffset(long commitOffset) {
+ this.commitOffset = commitOffset;
+ }
+
+
+ public long getCachedMsgMinOffset() {
+ return cachedMsgMinOffset;
+ }
+
+
+ public void setCachedMsgMinOffset(long cachedMsgMinOffset) {
+ this.cachedMsgMinOffset = cachedMsgMinOffset;
+ }
+
+
+ public long getCachedMsgMaxOffset() {
+ return cachedMsgMaxOffset;
+ }
+
+
+ public void setCachedMsgMaxOffset(long cachedMsgMaxOffset) {
+ this.cachedMsgMaxOffset = cachedMsgMaxOffset;
+ }
+
+
+ public int getCachedMsgCount() {
+ return cachedMsgCount;
+ }
+
+
+ public void setCachedMsgCount(int cachedMsgCount) {
+ this.cachedMsgCount = cachedMsgCount;
+ }
+
+
+ public long getTransactionMsgMinOffset() {
+ return transactionMsgMinOffset;
+ }
+
+
+ public void setTransactionMsgMinOffset(long transactionMsgMinOffset) {
+ this.transactionMsgMinOffset = transactionMsgMinOffset;
+ }
+
+
+ public long getTransactionMsgMaxOffset() {
+ return transactionMsgMaxOffset;
+ }
+
+
+ public void setTransactionMsgMaxOffset(long transactionMsgMaxOffset) {
+ this.transactionMsgMaxOffset = transactionMsgMaxOffset;
+ }
+
+
+ public int getTransactionMsgCount() {
+ return transactionMsgCount;
+ }
+
+
+ public void setTransactionMsgCount(int transactionMsgCount) {
+ this.transactionMsgCount = transactionMsgCount;
+ }
+
+
+ public boolean isLocked() {
+ return locked;
+ }
+
+
+ public void setLocked(boolean locked) {
+ this.locked = locked;
+ }
+
+
+ public long getTryUnlockTimes() {
+ return tryUnlockTimes;
+ }
+
+
+ public void setTryUnlockTimes(long tryUnlockTimes) {
+ this.tryUnlockTimes = tryUnlockTimes;
+ }
+
+
+ public long getLastLockTimestamp() {
+ return lastLockTimestamp;
+ }
+
+
+ public void setLastLockTimestamp(long lastLockTimestamp) {
+ this.lastLockTimestamp = lastLockTimestamp;
+ }
+
+
+ public boolean isDroped() {
+ return droped;
+ }
+
+
+ public void setDroped(boolean droped) {
+ this.droped = droped;
+ }
+
+
+ public long getLastPullTimestamp() {
+ return lastPullTimestamp;
+ }
+
+
+ public void setLastPullTimestamp(long lastPullTimestamp) {
+ this.lastPullTimestamp = lastPullTimestamp;
+ }
+
+
+ public long getLastConsumeTimestamp() {
+ return lastConsumeTimestamp;
+ }
+
+
+ public void setLastConsumeTimestamp(long lastConsumeTimestamp) {
+ this.lastConsumeTimestamp = lastConsumeTimestamp;
+ }
+
+
+ @Override
+ public String toString() {
+ return "ProcessQueueInfo [commitOffset=" + commitOffset + ", cachedMsgMinOffset="
+ + cachedMsgMinOffset + ", cachedMsgMaxOffset=" + cachedMsgMaxOffset + ", cachedMsgCount="
+ + cachedMsgCount + ", transactionMsgMinOffset=" + transactionMsgMinOffset
+ + ", transactionMsgMaxOffset=" + transactionMsgMaxOffset + ", transactionMsgCount="
+ + transactionMsgCount + ", locked=" + locked + ", tryUnlockTimes=" + tryUnlockTimes
+ + ", lastLockTimestamp=" + UtilAll.timeMillisToHumanString(lastLockTimestamp) + ", droped="
+ + droped + ", lastPullTimestamp=" + UtilAll.timeMillisToHumanString(lastPullTimestamp)
+ + ", lastConsumeTimestamp=" + UtilAll.timeMillisToHumanString(lastConsumeTimestamp) + "]";
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ProducerConnection.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ProducerConnection.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ProducerConnection.java
new file mode 100644
index 0000000..32fe1d0
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ProducerConnection.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.rocketmq.common.protocol.body;
+
+import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable;
+
+import java.util.HashSet;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class ProducerConnection extends RemotingSerializable {
+ private HashSet<Connection> connectionSet = new HashSet<Connection>();
+
+
+ public HashSet<Connection> getConnectionSet() {
+ return connectionSet;
+ }
+
+
+ public void setConnectionSet(HashSet<Connection> connectionSet) {
+ this.connectionSet = connectionSet;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/QueryConsumeTimeSpanBody.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/QueryConsumeTimeSpanBody.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/QueryConsumeTimeSpanBody.java
new file mode 100644
index 0000000..2f52666
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/QueryConsumeTimeSpanBody.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.rocketmq.common.protocol.body;
+
+import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable;
+
+import java.util.ArrayList;
+import java.util.List;
+
+
+/**
+ * @author manhong.yqd
+ */
+public class QueryConsumeTimeSpanBody extends RemotingSerializable {
+ List<QueueTimeSpan> consumeTimeSpanSet = new ArrayList<QueueTimeSpan>();
+
+
+ public List<QueueTimeSpan> getConsumeTimeSpanSet() {
+ return consumeTimeSpanSet;
+ }
+
+
+ public void setConsumeTimeSpanSet(List<QueueTimeSpan> consumeTimeSpanSet) {
+ this.consumeTimeSpanSet = consumeTimeSpanSet;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/QueryCorrectionOffsetBody.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/QueryCorrectionOffsetBody.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/QueryCorrectionOffsetBody.java
new file mode 100644
index 0000000..225b90c
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/QueryCorrectionOffsetBody.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.common.protocol.body;
+
+import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable;
+
+import java.util.HashMap;
+import java.util.Map;
+
+
+/**
+ * @author manhong.yqd
+ */
+public class QueryCorrectionOffsetBody extends RemotingSerializable {
+ private Map<Integer, Long> correctionOffsets = new HashMap<Integer, Long>();
+
+
+ public Map<Integer, Long> getCorrectionOffsets() {
+ return correctionOffsets;
+ }
+
+
+ public void setCorrectionOffsets(Map<Integer, Long> correctionOffsets) {
+ this.correctionOffsets = correctionOffsets;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/QueueTimeSpan.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/QueueTimeSpan.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/QueueTimeSpan.java
new file mode 100644
index 0000000..14001ec
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/QueueTimeSpan.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.rocketmq.common.protocol.body;
+
+import com.alibaba.rocketmq.common.UtilAll;
+import com.alibaba.rocketmq.common.message.MessageQueue;
+
+import java.util.Date;
+
+
+/**
+ * @author manhong.yqd
+ */
+public class QueueTimeSpan {
+ private MessageQueue messageQueue;
+ private long minTimeStamp;
+ private long maxTimeStamp;
+ private long consumeTimeStamp;
+ private long delayTime;
+
+
+ public MessageQueue getMessageQueue() {
+ return messageQueue;
+ }
+
+
+ public void setMessageQueue(MessageQueue messageQueue) {
+ this.messageQueue = messageQueue;
+ }
+
+
+ public long getMinTimeStamp() {
+ return minTimeStamp;
+ }
+
+
+ public void setMinTimeStamp(long minTimeStamp) {
+ this.minTimeStamp = minTimeStamp;
+ }
+
+
+ public long getMaxTimeStamp() {
+ return maxTimeStamp;
+ }
+
+
+ public void setMaxTimeStamp(long maxTimeStamp) {
+ this.maxTimeStamp = maxTimeStamp;
+ }
+
+
+ public long getConsumeTimeStamp() {
+ return consumeTimeStamp;
+ }
+
+
+ public void setConsumeTimeStamp(long consumeTimeStamp) {
+ this.consumeTimeStamp = consumeTimeStamp;
+ }
+
+
+ public String getMinTimeStampStr() {
+ return UtilAll.formatDate(new Date(minTimeStamp), UtilAll.YYYY_MM_DD_HH_MM_SS_SSS);
+ }
+
+
+ public String getMaxTimeStampStr() {
+ return UtilAll.formatDate(new Date(maxTimeStamp), UtilAll.YYYY_MM_DD_HH_MM_SS_SSS);
+ }
+
+
+ public String getConsumeTimeStampStr() {
+ return UtilAll.formatDate(new Date(consumeTimeStamp), UtilAll.YYYY_MM_DD_HH_MM_SS_SSS);
+ }
+
+
+ public long getDelayTime() {
+ return delayTime;
+ }
+
+
+ public void setDelayTime(long delayTime) {
+ this.delayTime = delayTime;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/RegisterBrokerBody.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/RegisterBrokerBody.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/RegisterBrokerBody.java
new file mode 100644
index 0000000..364bbcb
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/RegisterBrokerBody.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.rocketmq.common.protocol.body;
+
+import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable;
+
+import java.util.ArrayList;
+import java.util.List;
+
+
+public class RegisterBrokerBody extends RemotingSerializable {
+ private TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper();
+ private List<String> filterServerList = new ArrayList<String>();
+
+
+ public TopicConfigSerializeWrapper getTopicConfigSerializeWrapper() {
+ return topicConfigSerializeWrapper;
+ }
+
+
+ public void setTopicConfigSerializeWrapper(TopicConfigSerializeWrapper topicConfigSerializeWrapper) {
+ this.topicConfigSerializeWrapper = topicConfigSerializeWrapper;
+ }
+
+
+ public List<String> getFilterServerList() {
+ return filterServerList;
+ }
+
+
+ public void setFilterServerList(List<String> filterServerList) {
+ this.filterServerList = filterServerList;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ResetOffsetBody.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ResetOffsetBody.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ResetOffsetBody.java
new file mode 100644
index 0000000..2122e61
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ResetOffsetBody.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.rocketmq.common.protocol.body;
+
+import com.alibaba.rocketmq.common.message.MessageQueue;
+import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable;
+
+import java.util.Map;
+
+
+/**
+ * @author manhong.yqd
+ *
+ */
+public class ResetOffsetBody extends RemotingSerializable {
+ private Map<MessageQueue, Long> offsetTable;
+
+
+ public Map<MessageQueue, Long> getOffsetTable() {
+ return offsetTable;
+ }
+
+
+ public void setOffsetTable(Map<MessageQueue, Long> offsetTable) {
+ this.offsetTable = offsetTable;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ResetOffsetBodyForC.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ResetOffsetBodyForC.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ResetOffsetBodyForC.java
new file mode 100644
index 0000000..fb7360e
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/ResetOffsetBodyForC.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.rocketmq.common.protocol.body;
+
+import com.alibaba.rocketmq.common.message.MessageQueueForC;
+import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable;
+
+import java.util.List;
+
+public class ResetOffsetBodyForC extends RemotingSerializable {
+
+ private List<MessageQueueForC> offsetTable;
+
+
+ public List<MessageQueueForC> getOffsetTable() {
+ return offsetTable;
+ }
+
+
+ public void setOffsetTable(List<MessageQueueForC> offsetTable) {
+ this.offsetTable = offsetTable;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/SubscriptionGroupWrapper.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/SubscriptionGroupWrapper.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/SubscriptionGroupWrapper.java
new file mode 100644
index 0000000..096672c
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/SubscriptionGroupWrapper.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.rocketmq.common.protocol.body;
+
+import com.alibaba.rocketmq.common.DataVersion;
+import com.alibaba.rocketmq.common.subscription.SubscriptionGroupConfig;
+import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+
+/**
+ * @author manhong.yqd
+ */
+public class SubscriptionGroupWrapper extends RemotingSerializable {
+ private ConcurrentHashMap<String, SubscriptionGroupConfig> subscriptionGroupTable =
+ new ConcurrentHashMap<String, SubscriptionGroupConfig>(1024);
+ private DataVersion dataVersion = new DataVersion();
+
+
+ public ConcurrentHashMap<String, SubscriptionGroupConfig> getSubscriptionGroupTable() {
+ return subscriptionGroupTable;
+ }
+
+
+ public void setSubscriptionGroupTable(
+ ConcurrentHashMap<String, SubscriptionGroupConfig> subscriptionGroupTable) {
+ this.subscriptionGroupTable = subscriptionGroupTable;
+ }
+
+
+ public DataVersion getDataVersion() {
+ return dataVersion;
+ }
+
+
+ public void setDataVersion(DataVersion dataVersion) {
+ this.dataVersion = dataVersion;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/TopicConfigSerializeWrapper.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/TopicConfigSerializeWrapper.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/TopicConfigSerializeWrapper.java
new file mode 100644
index 0000000..0050762
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/TopicConfigSerializeWrapper.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.rocketmq.common.protocol.body;
+
+import com.alibaba.rocketmq.common.DataVersion;
+import com.alibaba.rocketmq.common.TopicConfig;
+import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+
+public class TopicConfigSerializeWrapper extends RemotingSerializable {
+ private ConcurrentHashMap<String, TopicConfig> topicConfigTable =
+ new ConcurrentHashMap<String, TopicConfig>();
+ private DataVersion dataVersion = new DataVersion();
+
+
+ public ConcurrentHashMap<String, TopicConfig> getTopicConfigTable() {
+ return topicConfigTable;
+ }
+
+
+ public void setTopicConfigTable(ConcurrentHashMap<String, TopicConfig> topicConfigTable) {
+ this.topicConfigTable = topicConfigTable;
+ }
+
+
+ public DataVersion getDataVersion() {
+ return dataVersion;
+ }
+
+
+ public void setDataVersion(DataVersion dataVersion) {
+ this.dataVersion = dataVersion;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/TopicList.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/TopicList.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/TopicList.java
new file mode 100644
index 0000000..84912ce
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/TopicList.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.common.protocol.body;
+
+import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable;
+
+import java.util.HashSet;
+import java.util.Set;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class TopicList extends RemotingSerializable {
+ private Set<String> topicList = new HashSet<String>();
+ private String brokerAddr;
+
+
+ public Set<String> getTopicList() {
+ return topicList;
+ }
+
+
+ public void setTopicList(Set<String> topicList) {
+ this.topicList = topicList;
+ }
+
+
+ public String getBrokerAddr() {
+ return brokerAddr;
+ }
+
+
+ public void setBrokerAddr(String brokerAddr) {
+ this.brokerAddr = brokerAddr;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/UnlockBatchRequestBody.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/UnlockBatchRequestBody.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/UnlockBatchRequestBody.java
new file mode 100644
index 0000000..542b797
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/body/UnlockBatchRequestBody.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.rocketmq.common.protocol.body;
+
+import com.alibaba.rocketmq.common.message.MessageQueue;
+import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable;
+
+import java.util.HashSet;
+import java.util.Set;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class UnlockBatchRequestBody extends RemotingSerializable {
+ private String consumerGroup;
+ private String clientId;
+ private Set<MessageQueue> mqSet = new HashSet<MessageQueue>();
+
+
+ public String getConsumerGroup() {
+ return consumerGroup;
+ }
+
+
+ public void setConsumerGroup(String consumerGroup) {
+ this.consumerGroup = consumerGroup;
+ }
+
+
+ public String getClientId() {
+ return clientId;
+ }
+
+
+ public void setClientId(String clientId) {
+ this.clientId = clientId;
+ }
+
+
+ public Set<MessageQueue> getMqSet() {
+ return mqSet;
+ }
+
+
+ public void setMqSet(Set<MessageQueue> mqSet) {
+ this.mqSet = mqSet;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/header/CheckTransactionStateRequestHeader.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/header/CheckTransactionStateRequestHeader.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/header/CheckTransactionStateRequestHeader.java
new file mode 100644
index 0000000..37d6a7f
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/header/CheckTransactionStateRequestHeader.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * $Id: EndTransactionRequestHeader.java 1835 2013-05-16 02:00:50Z shijia.wxr $
+ */
+package com.alibaba.rocketmq.common.protocol.header;
+
+import com.alibaba.rocketmq.remoting.CommandCustomHeader;
+import com.alibaba.rocketmq.remoting.annotation.CFNotNull;
+import com.alibaba.rocketmq.remoting.exception.RemotingCommandException;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class CheckTransactionStateRequestHeader implements CommandCustomHeader {
+ @CFNotNull
+ private Long tranStateTableOffset;
+ @CFNotNull
+ private Long commitLogOffset;
+ private String msgId;
+ private String transactionId;
+
+
+ @Override
+ public void checkFields() throws RemotingCommandException {
+ }
+
+
+ public Long getTranStateTableOffset() {
+ return tranStateTableOffset;
+ }
+
+
+ public void setTranStateTableOffset(Long tranStateTableOffset) {
+ this.tranStateTableOffset = tranStateTableOffset;
+ }
+
+
+ public Long getCommitLogOffset() {
+ return commitLogOffset;
+ }
+
+
+ public void setCommitLogOffset(Long commitLogOffset) {
+ this.commitLogOffset = commitLogOffset;
+ }
+
+ public String getMsgId() {
+ return msgId;
+ }
+
+ public void setMsgId(String msgId) {
+ this.msgId = msgId;
+ }
+
+ public String getTransactionId() {
+ return transactionId;
+ }
+
+ public void setTransactionId(String transactionId) {
+ this.transactionId = transactionId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/header/CheckTransactionStateResponseHeader.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/header/CheckTransactionStateResponseHeader.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/header/CheckTransactionStateResponseHeader.java
new file mode 100644
index 0000000..76c9732
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/header/CheckTransactionStateResponseHeader.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * $Id: EndTransactionResponseHeader.java 1835 2013-05-16 02:00:50Z shijia.wxr $
+ */
+package com.alibaba.rocketmq.common.protocol.header;
+
+import com.alibaba.rocketmq.common.sysflag.MessageSysFlag;
+import com.alibaba.rocketmq.remoting.CommandCustomHeader;
+import com.alibaba.rocketmq.remoting.annotation.CFNotNull;
+import com.alibaba.rocketmq.remoting.exception.RemotingCommandException;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class CheckTransactionStateResponseHeader implements CommandCustomHeader {
+ @CFNotNull
+ private String producerGroup;
+ @CFNotNull
+ private Long tranStateTableOffset;
+ @CFNotNull
+ private Long commitLogOffset;
+ @CFNotNull
+ private Integer commitOrRollback; // TRANSACTION_COMMIT_TYPE
+
+
+ // TRANSACTION_ROLLBACK_TYPE
+
+ @Override
+ public void checkFields() throws RemotingCommandException {
+ if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == this.commitOrRollback) {
+ return;
+ }
+
+ if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == this.commitOrRollback) {
+ return;
+ }
+
+ throw new RemotingCommandException("commitOrRollback field wrong");
+ }
+
+
+ public String getProducerGroup() {
+ return producerGroup;
+ }
+
+
+ public void setProducerGroup(String producerGroup) {
+ this.producerGroup = producerGroup;
+ }
+
+
+ public Long getTranStateTableOffset() {
+ return tranStateTableOffset;
+ }
+
+
+ public void setTranStateTableOffset(Long tranStateTableOffset) {
+ this.tranStateTableOffset = tranStateTableOffset;
+ }
+
+
+ public Long getCommitLogOffset() {
+ return commitLogOffset;
+ }
+
+
+ public void setCommitLogOffset(Long commitLogOffset) {
+ this.commitLogOffset = commitLogOffset;
+ }
+
+
+ public Integer getCommitOrRollback() {
+ return commitOrRollback;
+ }
+
+
+ public void setCommitOrRollback(Integer commitOrRollback) {
+ this.commitOrRollback = commitOrRollback;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/header/CloneGroupOffsetRequestHeader.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/header/CloneGroupOffsetRequestHeader.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/header/CloneGroupOffsetRequestHeader.java
new file mode 100644
index 0000000..6043229
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/header/CloneGroupOffsetRequestHeader.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * $Id: DeleteTopicRequestHeader.java 1835 2013-05-16 02:00:50Z shijia.wxr $
+ */
+package com.alibaba.rocketmq.common.protocol.header;
+
+import com.alibaba.rocketmq.remoting.CommandCustomHeader;
+import com.alibaba.rocketmq.remoting.annotation.CFNotNull;
+import com.alibaba.rocketmq.remoting.exception.RemotingCommandException;
+
+
+/**
+ * @author manhong.yqd
+ */
+public class CloneGroupOffsetRequestHeader implements CommandCustomHeader {
+ @CFNotNull
+ private String srcGroup;
+ @CFNotNull
+ private String destGroup;
+ private String topic;
+ private boolean offline;
+
+
+ @Override
+ public void checkFields() throws RemotingCommandException {
+ }
+
+
+ public String getDestGroup() {
+ return destGroup;
+ }
+
+
+ public void setDestGroup(String destGroup) {
+ this.destGroup = destGroup;
+ }
+
+
+ public String getTopic() {
+ return topic;
+ }
+
+
+ public void setTopic(String topic) {
+ this.topic = topic;
+ }
+
+
+ public String getSrcGroup() {
+
+ return srcGroup;
+ }
+
+
+ public void setSrcGroup(String srcGroup) {
+ this.srcGroup = srcGroup;
+ }
+
+
+ public boolean isOffline() {
+ return offline;
+ }
+
+
+ public void setOffline(boolean offline) {
+ this.offline = offline;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/header/ConsumeMessageDirectlyResultRequestHeader.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/header/ConsumeMessageDirectlyResultRequestHeader.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/header/ConsumeMessageDirectlyResultRequestHeader.java
new file mode 100644
index 0000000..3c68636
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/header/ConsumeMessageDirectlyResultRequestHeader.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.rocketmq.common.protocol.header;
+
+import com.alibaba.rocketmq.remoting.CommandCustomHeader;
+import com.alibaba.rocketmq.remoting.annotation.CFNotNull;
+import com.alibaba.rocketmq.remoting.annotation.CFNullable;
+import com.alibaba.rocketmq.remoting.exception.RemotingCommandException;
+
+
+public class ConsumeMessageDirectlyResultRequestHeader implements CommandCustomHeader {
+ @CFNotNull
+ private String consumerGroup;
+ @CFNullable
+ private String clientId;
+ @CFNullable
+ private String msgId;
+ @CFNullable
+ private String brokerName;
+
+
+ @Override
+ public void checkFields() throws RemotingCommandException {
+ }
+
+
+ public String getConsumerGroup() {
+ return consumerGroup;
+ }
+
+
+ public void setConsumerGroup(String consumerGroup) {
+ this.consumerGroup = consumerGroup;
+ }
+
+
+ public String getBrokerName() {
+ return brokerName;
+ }
+
+
+ public void setBrokerName(String brokerName) {
+ this.brokerName = brokerName;
+ }
+
+
+ public String getClientId() {
+ return clientId;
+ }
+
+
+ public void setClientId(String clientId) {
+ this.clientId = clientId;
+ }
+
+
+ public String getMsgId() {
+ return msgId;
+ }
+
+
+ public void setMsgId(String msgId) {
+ this.msgId = msgId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/header/ConsumerSendMsgBackRequestHeader.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/header/ConsumerSendMsgBackRequestHeader.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/header/ConsumerSendMsgBackRequestHeader.java
new file mode 100644
index 0000000..c0acf88
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/header/ConsumerSendMsgBackRequestHeader.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.rocketmq.common.protocol.header;
+
+import com.alibaba.rocketmq.remoting.CommandCustomHeader;
+import com.alibaba.rocketmq.remoting.annotation.CFNotNull;
+import com.alibaba.rocketmq.remoting.annotation.CFNullable;
+import com.alibaba.rocketmq.remoting.exception.RemotingCommandException;
+
+
+/**
+ * @author shijia.wxr
+ *
+ */
+public class ConsumerSendMsgBackRequestHeader implements CommandCustomHeader {
+ @CFNotNull
+ private Long offset;
+ @CFNotNull
+ private String group;
+ @CFNotNull
+ private Integer delayLevel;
+ private String originMsgId;
+ private String originTopic;
+ @CFNullable
+ private boolean unitMode = false;
+ private Integer maxReconsumeTimes;
+
+
+ @Override
+ public void checkFields() throws RemotingCommandException {
+
+ }
+
+
+ public Long getOffset() {
+ return offset;
+ }
+
+
+ public void setOffset(Long offset) {
+ this.offset = offset;
+ }
+
+
+ public String getGroup() {
+ return group;
+ }
+
+
+ public void setGroup(String group) {
+ this.group = group;
+ }
+
+
+ public Integer getDelayLevel() {
+ return delayLevel;
+ }
+
+
+ public void setDelayLevel(Integer delayLevel) {
+ this.delayLevel = delayLevel;
+ }
+
+
+ public String getOriginMsgId() {
+ return originMsgId;
+ }
+
+
+ public void setOriginMsgId(String originMsgId) {
+ this.originMsgId = originMsgId;
+ }
+
+
+ public String getOriginTopic() {
+ return originTopic;
+ }
+
+
+ public void setOriginTopic(String originTopic) {
+ this.originTopic = originTopic;
+ }
+
+
+ public boolean isUnitMode() {
+ return unitMode;
+ }
+
+
+ public void setUnitMode(boolean unitMode) {
+ this.unitMode = unitMode;
+ }
+
+
+ public Integer getMaxReconsumeTimes() {
+ return maxReconsumeTimes;
+ }
+
+
+ public void setMaxReconsumeTimes(final Integer maxReconsumeTimes) {
+ this.maxReconsumeTimes = maxReconsumeTimes;
+ }
+
+
+ @Override
+ public String toString() {
+ return "ConsumerSendMsgBackRequestHeader [group=" + group + ", originTopic=" + originTopic + ", originMsgId=" + originMsgId
+ + ", delayLevel=" + delayLevel + ", unitMode=" + unitMode + ", maxReconsumeTimes=" + maxReconsumeTimes + "]";
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/header/CreateTopicRequestHeader.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/header/CreateTopicRequestHeader.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/header/CreateTopicRequestHeader.java
new file mode 100644
index 0000000..a9d219c
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/header/CreateTopicRequestHeader.java
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * $Id: CreateTopicRequestHeader.java 1835 2013-05-16 02:00:50Z shijia.wxr $
+ */
+package com.alibaba.rocketmq.common.protocol.header;
+
+import com.alibaba.rocketmq.common.TopicFilterType;
+import com.alibaba.rocketmq.remoting.CommandCustomHeader;
+import com.alibaba.rocketmq.remoting.annotation.CFNotNull;
+import com.alibaba.rocketmq.remoting.exception.RemotingCommandException;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class CreateTopicRequestHeader implements CommandCustomHeader {
+ @CFNotNull
+ private String topic;
+ @CFNotNull
+ private String defaultTopic;
+ @CFNotNull
+ private Integer readQueueNums;
+ @CFNotNull
+ private Integer writeQueueNums;
+ @CFNotNull
+ private Integer perm;
+ @CFNotNull
+ private String topicFilterType;
+ private Integer topicSysFlag;
+ @CFNotNull
+ private Boolean order = false;
+
+
+ @Override
+ public void checkFields() throws RemotingCommandException {
+ try {
+ TopicFilterType.valueOf(this.topicFilterType);
+ } catch (Exception e) {
+ throw new RemotingCommandException("topicFilterType = [" + topicFilterType + "] value invalid", e);
+ }
+ }
+
+
+ public TopicFilterType getTopicFilterTypeEnum() {
+ return TopicFilterType.valueOf(this.topicFilterType);
+ }
+
+
+ public String getTopic() {
+ return topic;
+ }
+
+
+ public void setTopic(String topic) {
+ this.topic = topic;
+ }
+
+
+ public String getDefaultTopic() {
+ return defaultTopic;
+ }
+
+
+ public void setDefaultTopic(String defaultTopic) {
+ this.defaultTopic = defaultTopic;
+ }
+
+
+ public Integer getReadQueueNums() {
+ return readQueueNums;
+ }
+
+
+ public void setReadQueueNums(Integer readQueueNums) {
+ this.readQueueNums = readQueueNums;
+ }
+
+
+ public Integer getWriteQueueNums() {
+ return writeQueueNums;
+ }
+
+
+ public void setWriteQueueNums(Integer writeQueueNums) {
+ this.writeQueueNums = writeQueueNums;
+ }
+
+
+ public Integer getPerm() {
+ return perm;
+ }
+
+
+ public void setPerm(Integer perm) {
+ this.perm = perm;
+ }
+
+
+ public String getTopicFilterType() {
+ return topicFilterType;
+ }
+
+
+ public void setTopicFilterType(String topicFilterType) {
+ this.topicFilterType = topicFilterType;
+ }
+
+
+ public Integer getTopicSysFlag() {
+ return topicSysFlag;
+ }
+
+
+ public void setTopicSysFlag(Integer topicSysFlag) {
+ this.topicSysFlag = topicSysFlag;
+ }
+
+
+ public Boolean getOrder() {
+ return order;
+ }
+
+
+ public void setOrder(Boolean order) {
+ this.order = order;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/header/DeleteSubscriptionGroupRequestHeader.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/header/DeleteSubscriptionGroupRequestHeader.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/header/DeleteSubscriptionGroupRequestHeader.java
new file mode 100644
index 0000000..9307c01
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/header/DeleteSubscriptionGroupRequestHeader.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.rocketmq.common.protocol.header;
+
+import com.alibaba.rocketmq.remoting.CommandCustomHeader;
+import com.alibaba.rocketmq.remoting.annotation.CFNotNull;
+import com.alibaba.rocketmq.remoting.exception.RemotingCommandException;
+
+
+/**
+ * @author manhong.yqd
+ */
+public class DeleteSubscriptionGroupRequestHeader implements CommandCustomHeader {
+ @CFNotNull
+ private String groupName;
+
+
+ @Override
+ public void checkFields() throws RemotingCommandException {
+ }
+
+
+ public String getGroupName() {
+ return groupName;
+ }
+
+
+ public void setGroupName(String groupName) {
+ this.groupName = groupName;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/header/DeleteTopicRequestHeader.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/header/DeleteTopicRequestHeader.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/header/DeleteTopicRequestHeader.java
new file mode 100644
index 0000000..4b1a844
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/header/DeleteTopicRequestHeader.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * $Id: DeleteTopicRequestHeader.java 1835 2013-05-16 02:00:50Z shijia.wxr $
+ */
+package com.alibaba.rocketmq.common.protocol.header;
+
+import com.alibaba.rocketmq.remoting.CommandCustomHeader;
+import com.alibaba.rocketmq.remoting.annotation.CFNotNull;
+import com.alibaba.rocketmq.remoting.exception.RemotingCommandException;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class DeleteTopicRequestHeader implements CommandCustomHeader {
+ @CFNotNull
+ private String topic;
+
+
+ @Override
+ public void checkFields() throws RemotingCommandException {
+ }
+
+
+ public String getTopic() {
+ return topic;
+ }
+
+
+ public void setTopic(String topic) {
+ this.topic = topic;
+ }
+}