You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2016/12/28 09:14:01 UTC
[10/99] [abbrv] [partial] incubator-rocketmq git commit: ROCKETMQ-18
Rename package name from com.alibaba to org.apache
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/message/MessageExt.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageExt.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageExt.java
new file mode 100644
index 0000000..db50672
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageExt.java
@@ -0,0 +1,238 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.message;
+
+import org.apache.rocketmq.common.TopicFilterType;
+import org.apache.rocketmq.common.sysflag.MessageSysFlag;
+
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class MessageExt extends Message {
+ private static final long serialVersionUID = 5720810158625748049L;
+
+ private int queueId;
+
+ private int storeSize;
+
+ private long queueOffset;
+ private int sysFlag;
+ private long bornTimestamp;
+ private SocketAddress bornHost;
+
+ private long storeTimestamp;
+ private SocketAddress storeHost;
+ private String msgId;
+ private long commitLogOffset;
+ private int bodyCRC;
+ private int reconsumeTimes;
+
+ private long preparedTransactionOffset;
+
+
+ public MessageExt() {
+ }
+
+
+ public MessageExt(int queueId, long bornTimestamp, SocketAddress bornHost, long storeTimestamp,
+ SocketAddress storeHost, String msgId) {
+ this.queueId = queueId;
+ this.bornTimestamp = bornTimestamp;
+ this.bornHost = bornHost;
+ this.storeTimestamp = storeTimestamp;
+ this.storeHost = storeHost;
+ this.msgId = msgId;
+ }
+
+ public static TopicFilterType parseTopicFilterType(final int sysFlag) {
+ if ((sysFlag & MessageSysFlag.MULTI_TAGS_FLAG) == MessageSysFlag.MULTI_TAGS_FLAG) {
+ return TopicFilterType.MULTI_TAG;
+ }
+
+ return TopicFilterType.SINGLE_TAG;
+ }
+
+ public ByteBuffer getBornHostBytes() {
+ return socketAddress2ByteBuffer(this.bornHost);
+ }
+
+ public ByteBuffer getBornHostBytes(ByteBuffer byteBuffer) {
+ return socketAddress2ByteBuffer(this.bornHost, byteBuffer);
+ }
+
+ private static ByteBuffer socketAddress2ByteBuffer(final SocketAddress socketAddress, final ByteBuffer byteBuffer) {
+ InetSocketAddress inetSocketAddress = (InetSocketAddress) socketAddress;
+ byteBuffer.put(inetSocketAddress.getAddress().getAddress(), 0, 4);
+ byteBuffer.putInt(inetSocketAddress.getPort());
+ byteBuffer.flip();
+ return byteBuffer;
+ }
+
+ public static ByteBuffer socketAddress2ByteBuffer(SocketAddress socketAddress) {
+ ByteBuffer byteBuffer = ByteBuffer.allocate(8);
+ return socketAddress2ByteBuffer(socketAddress, byteBuffer);
+ }
+
+ public ByteBuffer getStoreHostBytes() {
+ return socketAddress2ByteBuffer(this.storeHost);
+ }
+
+ public ByteBuffer getStoreHostBytes(ByteBuffer byteBuffer) {
+ return socketAddress2ByteBuffer(this.storeHost, byteBuffer);
+ }
+
+ public int getQueueId() {
+ return queueId;
+ }
+
+ public void setQueueId(int queueId) {
+ this.queueId = queueId;
+ }
+
+ public long getBornTimestamp() {
+ return bornTimestamp;
+ }
+
+ public void setBornTimestamp(long bornTimestamp) {
+ this.bornTimestamp = bornTimestamp;
+ }
+
+ public SocketAddress getBornHost() {
+ return bornHost;
+ }
+
+ public void setBornHost(SocketAddress bornHost) {
+ this.bornHost = bornHost;
+ }
+
+ public String getBornHostString() {
+ if (this.bornHost != null) {
+ InetSocketAddress inetSocketAddress = (InetSocketAddress) this.bornHost;
+ return inetSocketAddress.getAddress().getHostAddress();
+ }
+
+ return null;
+ }
+
+ public String getBornHostNameString() {
+ if (this.bornHost != null) {
+ InetSocketAddress inetSocketAddress = (InetSocketAddress) this.bornHost;
+ return inetSocketAddress.getAddress().getHostName();
+ }
+
+ return null;
+ }
+
+ public long getStoreTimestamp() {
+ return storeTimestamp;
+ }
+
+ public void setStoreTimestamp(long storeTimestamp) {
+ this.storeTimestamp = storeTimestamp;
+ }
+
+ public SocketAddress getStoreHost() {
+ return storeHost;
+ }
+
+ public void setStoreHost(SocketAddress storeHost) {
+ this.storeHost = storeHost;
+ }
+
+ public String getMsgId() {
+ return msgId;
+ }
+
+ public void setMsgId(String msgId) {
+ this.msgId = msgId;
+ }
+
+ public int getSysFlag() {
+ return sysFlag;
+ }
+
+ public void setSysFlag(int sysFlag) {
+ this.sysFlag = sysFlag;
+ }
+
+ public int getBodyCRC() {
+ return bodyCRC;
+ }
+
+ public void setBodyCRC(int bodyCRC) {
+ this.bodyCRC = bodyCRC;
+ }
+
+ public long getQueueOffset() {
+ return queueOffset;
+ }
+
+ public void setQueueOffset(long queueOffset) {
+ this.queueOffset = queueOffset;
+ }
+
+ public long getCommitLogOffset() {
+ return commitLogOffset;
+ }
+
+ public void setCommitLogOffset(long physicOffset) {
+ this.commitLogOffset = physicOffset;
+ }
+
+ public int getStoreSize() {
+ return storeSize;
+ }
+
+ public void setStoreSize(int storeSize) {
+ this.storeSize = storeSize;
+ }
+
+ public int getReconsumeTimes() {
+ return reconsumeTimes;
+ }
+
+
+ public void setReconsumeTimes(int reconsumeTimes) {
+ this.reconsumeTimes = reconsumeTimes;
+ }
+
+
+ public long getPreparedTransactionOffset() {
+ return preparedTransactionOffset;
+ }
+
+
+ public void setPreparedTransactionOffset(long preparedTransactionOffset) {
+ this.preparedTransactionOffset = preparedTransactionOffset;
+ }
+
+
+ @Override
+ public String toString() {
+ return "MessageExt [queueId=" + queueId + ", storeSize=" + storeSize + ", queueOffset=" + queueOffset
+ + ", sysFlag=" + sysFlag + ", bornTimestamp=" + bornTimestamp + ", bornHost=" + bornHost
+ + ", storeTimestamp=" + storeTimestamp + ", storeHost=" + storeHost + ", msgId=" + msgId
+ + ", commitLogOffset=" + commitLogOffset + ", bodyCRC=" + bodyCRC + ", reconsumeTimes="
+ + reconsumeTimes + ", preparedTransactionOffset=" + preparedTransactionOffset
+ + ", toString()=" + super.toString() + "]";
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/message/MessageId.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageId.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageId.java
new file mode 100644
index 0000000..95fe2f9
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageId.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.message;
+
+import java.net.SocketAddress;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class MessageId {
+ private SocketAddress address;
+ private long offset;
+
+
+ public MessageId(SocketAddress address, long offset) {
+ this.address = address;
+ this.offset = offset;
+ }
+
+
+ public SocketAddress getAddress() {
+ return address;
+ }
+
+
+ public void setAddress(SocketAddress address) {
+ this.address = address;
+ }
+
+
+ public long getOffset() {
+ return offset;
+ }
+
+
+ public void setOffset(long offset) {
+ this.offset = offset;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/message/MessageQueue.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageQueue.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageQueue.java
new file mode 100644
index 0000000..3c341e6
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageQueue.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.message;
+
+import java.io.Serializable;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class MessageQueue implements Comparable<MessageQueue>, Serializable {
+ private static final long serialVersionUID = 6191200464116433425L;
+ private String topic;
+ private String brokerName;
+ private int queueId;
+
+
+ public MessageQueue() {
+
+ }
+
+
+ public MessageQueue(String topic, String brokerName, int queueId) {
+ this.topic = topic;
+ this.brokerName = brokerName;
+ this.queueId = queueId;
+ }
+
+
+ public String getTopic() {
+ return topic;
+ }
+
+
+ public void setTopic(String topic) {
+ this.topic = topic;
+ }
+
+
+ public String getBrokerName() {
+ return brokerName;
+ }
+
+
+ public void setBrokerName(String brokerName) {
+ this.brokerName = brokerName;
+ }
+
+
+ public int getQueueId() {
+ return queueId;
+ }
+
+
+ public void setQueueId(int queueId) {
+ this.queueId = queueId;
+ }
+
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((brokerName == null) ? 0 : brokerName.hashCode());
+ result = prime * result + queueId;
+ result = prime * result + ((topic == null) ? 0 : topic.hashCode());
+ return result;
+ }
+
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ MessageQueue other = (MessageQueue) obj;
+ if (brokerName == null) {
+ if (other.brokerName != null)
+ return false;
+ } else if (!brokerName.equals(other.brokerName))
+ return false;
+ if (queueId != other.queueId)
+ return false;
+ if (topic == null) {
+ if (other.topic != null)
+ return false;
+ } else if (!topic.equals(other.topic))
+ return false;
+ return true;
+ }
+
+
+ @Override
+ public String toString() {
+ return "MessageQueue [topic=" + topic + ", brokerName=" + brokerName + ", queueId=" + queueId + "]";
+ }
+
+
+ @Override
+ public int compareTo(MessageQueue o) {
+ {
+ int result = this.topic.compareTo(o.topic);
+ if (result != 0) {
+ return result;
+ }
+ }
+
+ {
+ int result = this.brokerName.compareTo(o.brokerName);
+ if (result != 0) {
+ return result;
+ }
+ }
+
+ return this.queueId - o.queueId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/message/MessageQueueForC.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageQueueForC.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageQueueForC.java
new file mode 100644
index 0000000..27b5e07
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageQueueForC.java
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.message;
+
+import java.io.Serializable;
+
+
+/**
+ * @author lansheng.zj
+ */
+public class MessageQueueForC implements Comparable<MessageQueueForC>, Serializable {
+
+ private static final long serialVersionUID = 5320967846569962104L;
+ private String topic;
+ private String brokerName;
+ private int queueId;
+ private long offset;
+
+
+ public MessageQueueForC(String topic, String brokerName, int queueId, long offset) {
+ this.topic = topic;
+ this.brokerName = brokerName;
+ this.queueId = queueId;
+ this.offset = offset;
+ }
+
+
+ @Override
+ public int compareTo(MessageQueueForC o) {
+ int result = this.topic.compareTo(o.topic);
+ if (result != 0) {
+ return result;
+ }
+ result = this.brokerName.compareTo(o.brokerName);
+ if (result != 0) {
+ return result;
+ }
+ result = this.queueId - o.queueId;
+ if (result != 0) {
+ return result;
+ }
+ if ((this.offset - o.offset) > 0) {
+ return 1;
+ } else if ((this.offset - o.offset) == 0) {
+ return 0;
+ } else {
+ return -1;
+ }
+ }
+
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((brokerName == null) ? 0 : brokerName.hashCode());
+ result = prime * result + queueId;
+ result = prime * result + ((topic == null) ? 0 : topic.hashCode());
+ return result;
+ }
+
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ MessageQueueForC other = (MessageQueueForC) obj;
+ if (brokerName == null) {
+ if (other.brokerName != null)
+ return false;
+ } else if (!brokerName.equals(other.brokerName))
+ return false;
+ if (queueId != other.queueId)
+ return false;
+ if (topic == null) {
+ if (other.topic != null)
+ return false;
+ } else if (!topic.equals(other.topic))
+ return false;
+
+ if (offset != other.offset) {
+ return false;
+ }
+ return true;
+ }
+
+
+ @Override
+ public String toString() {
+ return "MessageQueueForC [topic=" + topic + ", brokerName=" + brokerName + ", queueId=" + queueId
+ + ", offset=" + offset + "]";
+ }
+
+
+ public String getTopic() {
+ return topic;
+ }
+
+
+ public void setTopic(String topic) {
+ this.topic = topic;
+ }
+
+
+ public String getBrokerName() {
+ return brokerName;
+ }
+
+
+ public void setBrokerName(String brokerName) {
+ this.brokerName = brokerName;
+ }
+
+
+ public int getQueueId() {
+ return queueId;
+ }
+
+
+ public void setQueueId(int queueId) {
+ this.queueId = queueId;
+ }
+
+
+ public long getOffset() {
+ return offset;
+ }
+
+
+ public void setOffset(long offset) {
+ this.offset = offset;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/message/MessageType.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageType.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageType.java
new file mode 100644
index 0000000..a1913a5
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageType.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.message;
+
+public enum MessageType {
+ Normal_Msg,
+ Trans_Msg_Half,
+ Trans_msg_Commit,
+ Delay_Msg,
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/namesrv/NamesrvConfig.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/namesrv/NamesrvConfig.java b/common/src/main/java/org/apache/rocketmq/common/namesrv/NamesrvConfig.java
new file mode 100644
index 0000000..d71e6b0
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/namesrv/NamesrvConfig.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * $Id: NamesrvConfig.java 1839 2013-05-16 02:12:02Z shijia.wxr $
+ */
+package org.apache.rocketmq.common.namesrv;
+
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+
+
+/**
+ *
+ * @author shijia.wxr
+ * @author lansheng.zj
+ */
+public class NamesrvConfig {
+ private static final Logger log = LoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
+ private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV));
+
+ private String kvConfigPath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "kvConfig.json";
+ private String configStorePath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "namesrv.properties";
+ private String productEnvName = "center";
+ private boolean clusterTest = false;
+ private boolean orderMessageEnable = false;
+
+ public boolean isOrderMessageEnable() {
+ return orderMessageEnable;
+ }
+
+ public void setOrderMessageEnable(boolean orderMessageEnable) {
+ this.orderMessageEnable = orderMessageEnable;
+ }
+
+ public String getRocketmqHome() {
+ return rocketmqHome;
+ }
+
+
+ public void setRocketmqHome(String rocketmqHome) {
+ this.rocketmqHome = rocketmqHome;
+ }
+
+
+ public String getKvConfigPath() {
+ return kvConfigPath;
+ }
+
+
+ public void setKvConfigPath(String kvConfigPath) {
+ this.kvConfigPath = kvConfigPath;
+ }
+
+
+ public String getProductEnvName() {
+ return productEnvName;
+ }
+
+
+ public void setProductEnvName(String productEnvName) {
+ this.productEnvName = productEnvName;
+ }
+
+
+ public boolean isClusterTest() {
+ return clusterTest;
+ }
+
+
+ public void setClusterTest(boolean clusterTest) {
+ this.clusterTest = clusterTest;
+ }
+
+ public String getConfigStorePath() {
+ return configStorePath;
+ }
+
+ public void setConfigStorePath(final String configStorePath) {
+ this.configStorePath = configStorePath;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/namesrv/NamesrvUtil.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/namesrv/NamesrvUtil.java b/common/src/main/java/org/apache/rocketmq/common/namesrv/NamesrvUtil.java
new file mode 100644
index 0000000..fb854f8
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/namesrv/NamesrvUtil.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.namesrv;
+
+/**
+ * @author shijia.wxr
+ */
+public class NamesrvUtil {
+ public static final String NAMESPACE_ORDER_TOPIC_CONFIG = "ORDER_TOPIC_CONFIG";
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/namesrv/RegisterBrokerResult.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/namesrv/RegisterBrokerResult.java b/common/src/main/java/org/apache/rocketmq/common/namesrv/RegisterBrokerResult.java
new file mode 100644
index 0000000..10c811e
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/namesrv/RegisterBrokerResult.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.namesrv;
+
+import org.apache.rocketmq.common.protocol.body.KVTable;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class RegisterBrokerResult {
+ private String haServerAddr;
+ private String masterAddr;
+ private KVTable kvTable;
+
+
+ public String getHaServerAddr() {
+ return haServerAddr;
+ }
+
+
+ public void setHaServerAddr(String haServerAddr) {
+ this.haServerAddr = haServerAddr;
+ }
+
+
+ public String getMasterAddr() {
+ return masterAddr;
+ }
+
+
+ public void setMasterAddr(String masterAddr) {
+ this.masterAddr = masterAddr;
+ }
+
+
+ public KVTable getKvTable() {
+ return kvTable;
+ }
+
+
+ public void setKvTable(KVTable kvTable) {
+ this.kvTable = kvTable;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/namesrv/TopAddressing.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/namesrv/TopAddressing.java b/common/src/main/java/org/apache/rocketmq/common/namesrv/TopAddressing.java
new file mode 100644
index 0000000..5836c05
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/namesrv/TopAddressing.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * $Id: TopAddressing.java 1831 2013-05-16 01:39:51Z shijia.wxr $
+ */
+package org.apache.rocketmq.common.namesrv;
+
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.help.FAQUrl;
+import org.apache.rocketmq.common.utils.HttpTinyClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+
+/**
+ * @author shijia.wxr
+ * @author manhong.yqd
+ */
+public class TopAddressing {
+ private static final Logger log = LoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
+ private String nsAddr;
+ private String wsAddr;
+ private String unitName;
+
+
+ public TopAddressing(final String wsAddr) {
+ this(wsAddr, null);
+ }
+
+
+ public TopAddressing(final String wsAddr, final String unitName) {
+ this.wsAddr = wsAddr;
+ this.unitName = unitName;
+ }
+
+ public final String fetchNSAddr() {
+ return fetchNSAddr(true, 3000);
+ }
+
+ public final String fetchNSAddr(boolean verbose, long timeoutMills) {
+ String url = this.wsAddr;
+ try {
+ if (!UtilAll.isBlank(this.unitName)) {
+ url = url + "-" + this.unitName + "?nofix=1";
+ }
+ HttpTinyClient.HttpResult result = HttpTinyClient.httpGet(url, null, null, "UTF-8", timeoutMills);
+ if (200 == result.code) {
+ String responseStr = result.content;
+ if (responseStr != null) {
+ return clearNewLine(responseStr);
+ } else {
+ log.error("fetch nameserver address is null");
+ }
+ } else {
+ log.error("fetch nameserver address failed. statusCode={}", result.code);
+ }
+ } catch (IOException e) {
+ if (verbose) {
+ log.error("fetch name server address exception", e);
+ }
+ }
+
+ if (verbose) {
+ String errorMsg =
+ "connect to " + url + " failed, maybe the domain name " + MixAll.WS_DOMAIN_NAME + " not bind in /etc/hosts";
+ errorMsg += FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL);
+
+ log.warn(errorMsg);
+ }
+ return null;
+ }
+
+ private static String clearNewLine(final String str) {
+ String newString = str.trim();
+ int index = newString.indexOf("\r");
+ if (index != -1) {
+ return newString.substring(0, index);
+ }
+
+ index = newString.indexOf("\n");
+ if (index != -1) {
+ return newString.substring(0, index);
+ }
+
+ return newString;
+ }
+
+ public String getNsAddr() {
+ return nsAddr;
+ }
+
+
+ public void setNsAddr(String nsAddr) {
+ this.nsAddr = nsAddr;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/protocol/MQProtosHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/MQProtosHelper.java b/common/src/main/java/org/apache/rocketmq/common/protocol/MQProtosHelper.java
new file mode 100644
index 0000000..44e2e4f
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/MQProtosHelper.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.protocol;
+
+import org.apache.rocketmq.common.protocol.header.namesrv.RegisterBrokerRequestHeader;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.apache.rocketmq.remoting.exception.RemotingConnectException;
+import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class MQProtosHelper {
+ public static boolean registerBrokerToNameServer(final String nsaddr, final String brokerAddr,
+ final long timeoutMillis) {
+ RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();
+ requestHeader.setBrokerAddr(brokerAddr);
+
+ RemotingCommand request =
+ RemotingCommand.createRequestCommand(RequestCode.REGISTER_BROKER, requestHeader);
+
+ try {
+ RemotingCommand response = RemotingHelper.invokeSync(nsaddr, request, timeoutMillis);
+ if (response != null) {
+ return ResponseCode.SUCCESS == response.getCode();
+ }
+ } catch (RemotingConnectException e) {
+ e.printStackTrace();
+ } catch (RemotingSendRequestException e) {
+ e.printStackTrace();
+ } catch (RemotingTimeoutException e) {
+ e.printStackTrace();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
new file mode 100644
index 0000000..d878726
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.protocol;
+
+public class RequestCode {
+
+ public static final int SEND_MESSAGE = 10;
+
+ public static final int PULL_MESSAGE = 11;
+
+ public static final int QUERY_MESSAGE = 12;
+ public static final int QUERY_BROKER_OFFSET = 13;
+ public static final int QUERY_CONSUMER_OFFSET = 14;
+ public static final int UPDATE_CONSUMER_OFFSET = 15;
+ public static final int UPDATE_AND_CREATE_TOPIC = 17;
+ public static final int GET_ALL_TOPIC_CONFIG = 21;
+ public static final int GET_TOPIC_CONFIG_LIST = 22;
+
+ public static final int GET_TOPIC_NAME_LIST = 23;
+
+ public static final int UPDATE_BROKER_CONFIG = 25;
+
+ public static final int GET_BROKER_CONFIG = 26;
+
+ public static final int TRIGGER_DELETE_FILES = 27;
+
+ public static final int GET_BROKER_RUNTIME_INFO = 28;
+ public static final int SEARCH_OFFSET_BY_TIMESTAMP = 29;
+ public static final int GET_MAX_OFFSET = 30;
+ public static final int GET_MIN_OFFSET = 31;
+
+ public static final int GET_EARLIEST_MSG_STORETIME = 32;
+
+ public static final int VIEW_MESSAGE_BY_ID = 33;
+
+ public static final int HEART_BEAT = 34;
+
+ public static final int UNREGISTER_CLIENT = 35;
+
+ public static final int CONSUMER_SEND_MSG_BACK = 36;
+
+ public static final int END_TRANSACTION = 37;
+ public static final int GET_CONSUMER_LIST_BY_GROUP = 38;
+
+ public static final int CHECK_TRANSACTION_STATE = 39;
+
+ public static final int NOTIFY_CONSUMER_IDS_CHANGED = 40;
+
+ public static final int LOCK_BATCH_MQ = 41;
+
+ public static final int UNLOCK_BATCH_MQ = 42;
+ public static final int GET_ALL_CONSUMER_OFFSET = 43;
+
+ public static final int GET_ALL_DELAY_OFFSET = 45;
+
+ public static final int PUT_KV_CONFIG = 100;
+
+ public static final int GET_KV_CONFIG = 101;
+
+ public static final int DELETE_KV_CONFIG = 102;
+
+ public static final int REGISTER_BROKER = 103;
+
+ public static final int UNREGISTER_BROKER = 104;
+ public static final int GET_ROUTEINTO_BY_TOPIC = 105;
+
+ public static final int GET_BROKER_CLUSTER_INFO = 106;
+ public static final int UPDATE_AND_CREATE_SUBSCRIPTIONGROUP = 200;
+ public static final int GET_ALL_SUBSCRIPTIONGROUP_CONFIG = 201;
+ public static final int GET_TOPIC_STATS_INFO = 202;
+ public static final int GET_CONSUMER_CONNECTION_LIST = 203;
+ public static final int GET_PRODUCER_CONNECTION_LIST = 204;
+ public static final int WIPE_WRITE_PERM_OF_BROKER = 205;
+
+
+ public static final int GET_ALL_TOPIC_LIST_FROM_NAMESERVER = 206;
+
+ public static final int DELETE_SUBSCRIPTIONGROUP = 207;
+ public static final int GET_CONSUME_STATS = 208;
+
+ public static final int SUSPEND_CONSUMER = 209;
+
+ public static final int RESUME_CONSUMER = 210;
+ public static final int RESET_CONSUMER_OFFSET_IN_CONSUMER = 211;
+ public static final int RESET_CONSUMER_OFFSET_IN_BROKER = 212;
+
+ public static final int ADJUST_CONSUMER_THREAD_POOL = 213;
+
+ public static final int WHO_CONSUME_THE_MESSAGE = 214;
+
+
+ public static final int DELETE_TOPIC_IN_BROKER = 215;
+
+ public static final int DELETE_TOPIC_IN_NAMESRV = 216;
+ public static final int GET_KVLIST_BY_NAMESPACE = 219;
+
+
+ public static final int RESET_CONSUMER_CLIENT_OFFSET = 220;
+
+ public static final int GET_CONSUMER_STATUS_FROM_CLIENT = 221;
+
+ public static final int INVOKE_BROKER_TO_RESET_OFFSET = 222;
+
+ public static final int INVOKE_BROKER_TO_GET_CONSUMER_STATUS = 223;
+
+
+ public static final int QUERY_TOPIC_CONSUME_BY_WHO = 300;
+
+ public static final int GET_TOPICS_BY_CLUSTER = 224;
+
+ public static final int REGISTER_FILTER_SERVER = 301;
+ public static final int REGISTER_MESSAGE_FILTER_CLASS = 302;
+
+ public static final int QUERY_CONSUME_TIME_SPAN = 303;
+
+ public static final int GET_SYSTEM_TOPIC_LIST_FROM_NS = 304;
+ public static final int GET_SYSTEM_TOPIC_LIST_FROM_BROKER = 305;
+
+ public static final int CLEAN_EXPIRED_CONSUMEQUEUE = 306;
+
+ public static final int GET_CONSUMER_RUNNING_INFO = 307;
+
+ public static final int QUERY_CORRECTION_OFFSET = 308;
+ public static final int CONSUME_MESSAGE_DIRECTLY = 309;
+
+ public static final int SEND_MESSAGE_V2 = 310;
+
+ public static final int GET_UNIT_TOPIC_LIST = 311;
+
+ public static final int GET_HAS_UNIT_SUB_TOPIC_LIST = 312;
+
+ public static final int GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST = 313;
+
+ public static final int CLONE_GROUP_OFFSET = 314;
+
+ public static final int VIEW_BROKER_STATS_DATA = 315;
+
+ public static final int CLEAN_UNUSED_TOPIC = 316;
+
+ public static final int GET_BROKER_CONSUME_STATS = 317;
+
+ /**
+ * update the config of name server
+ */
+ public static final int UPDATE_NAMESRV_CONFIG = 318;
+
+ /**
+ * get config from name server
+ */
+ public static final int GET_NAMESRV_CONFIG = 319;
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java b/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java
new file mode 100644
index 0000000..a5b4e2e
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.protocol;
+
+import org.apache.rocketmq.remoting.protocol.RemotingSysResponseCode;
+
+
+public class ResponseCode extends RemotingSysResponseCode {
+
+ public static final int FLUSH_DISK_TIMEOUT = 10;
+
+ public static final int SLAVE_NOT_AVAILABLE = 11;
+
+ public static final int FLUSH_SLAVE_TIMEOUT = 12;
+
+ public static final int MESSAGE_ILLEGAL = 13;
+
+ public static final int SERVICE_NOT_AVAILABLE = 14;
+
+ public static final int VERSION_NOT_SUPPORTED = 15;
+
+ public static final int NO_PERMISSION = 16;
+
+ public static final int TOPIC_NOT_EXIST = 17;
+ public static final int TOPIC_EXIST_ALREADY = 18;
+ public static final int PULL_NOT_FOUND = 19;
+
+ public static final int PULL_RETRY_IMMEDIATELY = 20;
+
+ public static final int PULL_OFFSET_MOVED = 21;
+
+ public static final int QUERY_NOT_FOUND = 22;
+
+ public static final int SUBSCRIPTION_PARSE_FAILED = 23;
+
+ public static final int SUBSCRIPTION_NOT_EXIST = 24;
+
+ public static final int SUBSCRIPTION_NOT_LATEST = 25;
+
+ public static final int SUBSCRIPTION_GROUP_NOT_EXIST = 26;
+
+ public static final int TRANSACTION_SHOULD_COMMIT = 200;
+
+ public static final int TRANSACTION_SHOULD_ROLLBACK = 201;
+
+ public static final int TRANSACTION_STATE_UNKNOW = 202;
+
+ public static final int TRANSACTION_STATE_GROUP_WRONG = 203;
+ public static final int NO_BUYER_ID = 204;
+
+
+ public static final int NOT_IN_CURRENT_UNIT = 205;
+
+
+ public static final int CONSUMER_NOT_ONLINE = 206;
+
+
+ public static final int CONSUME_MSG_TIMEOUT = 207;
+
+
+ public static final int NO_MESSAGE = 208;
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/protocol/body/BrokerStatsData.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/BrokerStatsData.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/BrokerStatsData.java
new file mode 100644
index 0000000..f1ac124
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/BrokerStatsData.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.protocol.body;
+
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+
+
+public class BrokerStatsData extends RemotingSerializable {
+
+ private BrokerStatsItem statsMinute;
+
+ private BrokerStatsItem statsHour;
+
+ private BrokerStatsItem statsDay;
+
+
+ public BrokerStatsItem getStatsMinute() {
+ return statsMinute;
+ }
+
+
+ public void setStatsMinute(BrokerStatsItem statsMinute) {
+ this.statsMinute = statsMinute;
+ }
+
+
+ public BrokerStatsItem getStatsHour() {
+ return statsHour;
+ }
+
+
+ public void setStatsHour(BrokerStatsItem statsHour) {
+ this.statsHour = statsHour;
+ }
+
+
+ public BrokerStatsItem getStatsDay() {
+ return statsDay;
+ }
+
+
+ public void setStatsDay(BrokerStatsItem statsDay) {
+ this.statsDay = statsDay;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/protocol/body/BrokerStatsItem.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/BrokerStatsItem.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/BrokerStatsItem.java
new file mode 100644
index 0000000..904770f
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/BrokerStatsItem.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.protocol.body;
+
+public class BrokerStatsItem {
+ private long sum;
+ private double tps;
+ private double avgpt;
+
+
+ public long getSum() {
+ return sum;
+ }
+
+
+ public void setSum(long sum) {
+ this.sum = sum;
+ }
+
+
+ public double getTps() {
+ return tps;
+ }
+
+
+ public void setTps(double tps) {
+ this.tps = tps;
+ }
+
+
+ public double getAvgpt() {
+ return avgpt;
+ }
+
+
+ public void setAvgpt(double avgpt) {
+ this.avgpt = avgpt;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/protocol/body/CMResult.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/CMResult.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/CMResult.java
new file mode 100644
index 0000000..85eda7b
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/CMResult.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.protocol.body;
+
+public enum CMResult {
+ CR_SUCCESS,
+ CR_LATER,
+ CR_ROLLBACK,
+ CR_COMMIT,
+ CR_THROW_EXCEPTION,
+ CR_RETURN_NULL,
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/protocol/body/ClusterInfo.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ClusterInfo.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ClusterInfo.java
new file mode 100644
index 0000000..461d28c
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ClusterInfo.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.protocol.body;
+
+import org.apache.rocketmq.common.protocol.route.BrokerData;
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Set;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class ClusterInfo extends RemotingSerializable {
+ private HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
+ private HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
+
+
+ public HashMap<String, BrokerData> getBrokerAddrTable() {
+ return brokerAddrTable;
+ }
+
+
+ public void setBrokerAddrTable(HashMap<String, BrokerData> brokerAddrTable) {
+ this.brokerAddrTable = brokerAddrTable;
+ }
+
+
+ public HashMap<String, Set<String>> getClusterAddrTable() {
+ return clusterAddrTable;
+ }
+
+
+ public void setClusterAddrTable(HashMap<String, Set<String>> clusterAddrTable) {
+ this.clusterAddrTable = clusterAddrTable;
+ }
+
+
+ public String[] retrieveAllAddrByCluster(String cluster) {
+ List<String> addrs = new ArrayList<String>();
+ if (clusterAddrTable.containsKey(cluster)) {
+ Set<String> brokerNames = clusterAddrTable.get(cluster);
+ for (String brokerName : brokerNames) {
+ BrokerData brokerData = brokerAddrTable.get(brokerName);
+ if (null != brokerData) {
+ addrs.addAll(brokerData.getBrokerAddrs().values());
+ }
+ }
+ }
+
+ return addrs.toArray(new String[]{});
+ }
+
+
+ public String[] retrieveAllClusterNames() {
+ return clusterAddrTable.keySet().toArray(new String[]{});
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/protocol/body/Connection.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/Connection.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/Connection.java
new file mode 100644
index 0000000..ed8d9b4
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/Connection.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.protocol.body;
+
+import org.apache.rocketmq.remoting.protocol.LanguageCode;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class Connection {
+ private String clientId;
+ private String clientAddr;
+ private LanguageCode language;
+ private int version;
+
+
+ public String getClientId() {
+ return clientId;
+ }
+
+
+ public void setClientId(String clientId) {
+ this.clientId = clientId;
+ }
+
+
+ public String getClientAddr() {
+ return clientAddr;
+ }
+
+
+ public void setClientAddr(String clientAddr) {
+ this.clientAddr = clientAddr;
+ }
+
+
+ public LanguageCode getLanguage() {
+ return language;
+ }
+
+
+ public void setLanguage(LanguageCode language) {
+ this.language = language;
+ }
+
+
+ public int getVersion() {
+ return version;
+ }
+
+
+ public void setVersion(int version) {
+ this.version = version;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumeByWho.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumeByWho.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumeByWho.java
new file mode 100644
index 0000000..e6d2cad
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumeByWho.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.protocol.body;
+
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+
+import java.util.HashSet;
+
+
+/**
+ * @author shijia.wxr
+ *
+ */
+public class ConsumeByWho extends RemotingSerializable {
+ private HashSet<String> consumedGroup = new HashSet<String>();
+ private HashSet<String> notConsumedGroup = new HashSet<String>();
+ private String topic;
+ private int queueId;
+ private long offset;
+
+
+ public HashSet<String> getConsumedGroup() {
+ return consumedGroup;
+ }
+
+
+ public void setConsumedGroup(HashSet<String> consumedGroup) {
+ this.consumedGroup = consumedGroup;
+ }
+
+
+ public HashSet<String> getNotConsumedGroup() {
+ return notConsumedGroup;
+ }
+
+
+ public void setNotConsumedGroup(HashSet<String> notConsumedGroup) {
+ this.notConsumedGroup = notConsumedGroup;
+ }
+
+
+ public String getTopic() {
+ return topic;
+ }
+
+
+ public void setTopic(String topic) {
+ this.topic = topic;
+ }
+
+
+ public int getQueueId() {
+ return queueId;
+ }
+
+
+ public void setQueueId(int queueId) {
+ this.queueId = queueId;
+ }
+
+
+ public long getOffset() {
+ return offset;
+ }
+
+
+ public void setOffset(long offset) {
+ this.offset = offset;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumeMessageDirectlyResult.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumeMessageDirectlyResult.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumeMessageDirectlyResult.java
new file mode 100644
index 0000000..9c63010
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumeMessageDirectlyResult.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.protocol.body;
+
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+
+
+public class ConsumeMessageDirectlyResult extends RemotingSerializable {
+ private boolean order = false;
+ private boolean autoCommit = true;
+ private CMResult consumeResult;
+ private String remark;
+ private long spentTimeMills;
+
+
+ public boolean isOrder() {
+ return order;
+ }
+
+
+ public void setOrder(boolean order) {
+ this.order = order;
+ }
+
+
+ public boolean isAutoCommit() {
+ return autoCommit;
+ }
+
+
+ public void setAutoCommit(boolean autoCommit) {
+ this.autoCommit = autoCommit;
+ }
+
+
+ public String getRemark() {
+ return remark;
+ }
+
+
+ public void setRemark(String remark) {
+ this.remark = remark;
+ }
+
+
+ public CMResult getConsumeResult() {
+ return consumeResult;
+ }
+
+
+ public void setConsumeResult(CMResult consumeResult) {
+ this.consumeResult = consumeResult;
+ }
+
+
+ public long getSpentTimeMills() {
+ return spentTimeMills;
+ }
+
+
+ public void setSpentTimeMills(long spentTimeMills) {
+ this.spentTimeMills = spentTimeMills;
+ }
+
+
+ @Override
+ public String toString() {
+ return "ConsumeMessageDirectlyResult [order=" + order + ", autoCommit=" + autoCommit
+ + ", consumeResult=" + consumeResult + ", remark=" + remark + ", spentTimeMills="
+ + spentTimeMills + "]";
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumeStatsList.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumeStatsList.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumeStatsList.java
new file mode 100644
index 0000000..8d1396a
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumeStatsList.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.protocol.body;
+
+import org.apache.rocketmq.common.admin.ConsumeStats;
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class ConsumeStatsList extends RemotingSerializable {
+ private List<Map<String/*subscriptionGroupName*/, List<ConsumeStats>>> consumeStatsList = new ArrayList<Map<String/*subscriptionGroupName*/, List<ConsumeStats>>>();
+ private String brokerAddr;
+ private long totalDiff;
+
+ public List<Map<String, List<ConsumeStats>>> getConsumeStatsList() {
+ return consumeStatsList;
+ }
+
+ public void setConsumeStatsList(List<Map<String, List<ConsumeStats>>> consumeStatsList) {
+ this.consumeStatsList = consumeStatsList;
+ }
+
+ public String getBrokerAddr() {
+ return brokerAddr;
+ }
+
+ public void setBrokerAddr(String brokerAddr) {
+ this.brokerAddr = brokerAddr;
+ }
+
+ public long getTotalDiff() {
+ return totalDiff;
+ }
+
+ public void setTotalDiff(long totalDiff) {
+ this.totalDiff = totalDiff;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumeStatus.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumeStatus.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumeStatus.java
new file mode 100644
index 0000000..35b6a02
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumeStatus.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.protocol.body;
+
+public class ConsumeStatus {
+ private double pullRT;
+ private double pullTPS;
+ private double consumeRT;
+ private double consumeOKTPS;
+ private double consumeFailedTPS;
+
+ private long consumeFailedMsgs;
+
+
+ public double getPullRT() {
+ return pullRT;
+ }
+
+
+ public void setPullRT(double pullRT) {
+ this.pullRT = pullRT;
+ }
+
+
+ public double getPullTPS() {
+ return pullTPS;
+ }
+
+
+ public void setPullTPS(double pullTPS) {
+ this.pullTPS = pullTPS;
+ }
+
+
+ public double getConsumeRT() {
+ return consumeRT;
+ }
+
+
+ public void setConsumeRT(double consumeRT) {
+ this.consumeRT = consumeRT;
+ }
+
+
+ public double getConsumeOKTPS() {
+ return consumeOKTPS;
+ }
+
+
+ public void setConsumeOKTPS(double consumeOKTPS) {
+ this.consumeOKTPS = consumeOKTPS;
+ }
+
+
+ public double getConsumeFailedTPS() {
+ return consumeFailedTPS;
+ }
+
+
+ public void setConsumeFailedTPS(double consumeFailedTPS) {
+ this.consumeFailedTPS = consumeFailedTPS;
+ }
+
+
+ public long getConsumeFailedMsgs() {
+ return consumeFailedMsgs;
+ }
+
+
+ public void setConsumeFailedMsgs(long consumeFailedMsgs) {
+ this.consumeFailedMsgs = consumeFailedMsgs;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumerConnection.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumerConnection.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumerConnection.java
new file mode 100644
index 0000000..fc3ce46
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumerConnection.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.protocol.body;
+
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
+import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
+import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+
+import java.util.HashSet;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class ConsumerConnection extends RemotingSerializable {
+ private HashSet<Connection> connectionSet = new HashSet<Connection>();
+ private ConcurrentHashMap<String/* Topic */, SubscriptionData> subscriptionTable =
+ new ConcurrentHashMap<String, SubscriptionData>();
+ private ConsumeType consumeType;
+ private MessageModel messageModel;
+ private ConsumeFromWhere consumeFromWhere;
+
+
+ public int computeMinVersion() {
+ int minVersion = Integer.MAX_VALUE;
+ for (Connection c : this.connectionSet) {
+ if (c.getVersion() < minVersion) {
+ minVersion = c.getVersion();
+ }
+ }
+
+ return minVersion;
+ }
+
+
+ public HashSet<Connection> getConnectionSet() {
+ return connectionSet;
+ }
+
+
+ public void setConnectionSet(HashSet<Connection> connectionSet) {
+ this.connectionSet = connectionSet;
+ }
+
+
+ public ConcurrentHashMap<String, SubscriptionData> getSubscriptionTable() {
+ return subscriptionTable;
+ }
+
+
+ public void setSubscriptionTable(ConcurrentHashMap<String, SubscriptionData> subscriptionTable) {
+ this.subscriptionTable = subscriptionTable;
+ }
+
+
+ public ConsumeType getConsumeType() {
+ return consumeType;
+ }
+
+
+ public void setConsumeType(ConsumeType consumeType) {
+ this.consumeType = consumeType;
+ }
+
+
+ public MessageModel getMessageModel() {
+ return messageModel;
+ }
+
+
+ public void setMessageModel(MessageModel messageModel) {
+ this.messageModel = messageModel;
+ }
+
+
+ public ConsumeFromWhere getConsumeFromWhere() {
+ return consumeFromWhere;
+ }
+
+
+ public void setConsumeFromWhere(ConsumeFromWhere consumeFromWhere) {
+ this.consumeFromWhere = consumeFromWhere;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumerOffsetSerializeWrapper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumerOffsetSerializeWrapper.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumerOffsetSerializeWrapper.java
new file mode 100644
index 0000000..5b4c6fb
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumerOffsetSerializeWrapper.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.protocol.body;
+
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+
+/**
+ * @author manhong.yqd
+ */
+public class ConsumerOffsetSerializeWrapper extends RemotingSerializable {
+ private ConcurrentHashMap<String/* topic@group */, ConcurrentHashMap<Integer, Long>> offsetTable =
+ new ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>>(512);
+
+
+ public ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>> getOffsetTable() {
+ return offsetTable;
+ }
+
+
+ public void setOffsetTable(ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>> offsetTable) {
+ this.offsetTable = offsetTable;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumerRunningInfo.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumerRunningInfo.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumerRunningInfo.java
new file mode 100644
index 0000000..9b0b383
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumerRunningInfo.java
@@ -0,0 +1,315 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.protocol.body;
+
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
+import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+
+import java.util.*;
+import java.util.Map.Entry;
+
+public class ConsumerRunningInfo extends RemotingSerializable {
+ public static final String PROP_NAMESERVER_ADDR = "PROP_NAMESERVER_ADDR";
+ public static final String PROP_THREADPOOL_CORE_SIZE = "PROP_THREADPOOL_CORE_SIZE";
+ public static final String PROP_CONSUME_ORDERLY = "PROP_CONSUMEORDERLY";
+ public static final String PROP_CONSUME_TYPE = "PROP_CONSUME_TYPE";
+ public static final String PROP_CLIENT_VERSION = "PROP_CLIENT_VERSION";
+ public static final String PROP_CONSUMER_START_TIMESTAMP = "PROP_CONSUMER_START_TIMESTAMP";
+
+
+ private Properties properties = new Properties();
+
+ private TreeSet<SubscriptionData> subscriptionSet = new TreeSet<SubscriptionData>();
+
+ private TreeMap<MessageQueue, ProcessQueueInfo> mqTable = new TreeMap<MessageQueue, ProcessQueueInfo>();
+
+ private TreeMap<String/* Topic */, ConsumeStatus> statusTable = new TreeMap<String, ConsumeStatus>();
+
+ private String jstack;
+
+ public static boolean analyzeSubscription(final TreeMap<String/* clientId */, ConsumerRunningInfo> criTable) {
+ ConsumerRunningInfo prev = criTable.firstEntry().getValue();
+
+ boolean push = false;
+ {
+ String property = prev.getProperties().getProperty(ConsumerRunningInfo.PROP_CONSUME_TYPE);
+
+ if (property == null) {
+ property = ((ConsumeType) prev.getProperties().get(ConsumerRunningInfo.PROP_CONSUME_TYPE)).name();
+ }
+ push = ConsumeType.valueOf(property) == ConsumeType.CONSUME_PASSIVELY;
+ }
+
+ boolean startForAWhile = false;
+ {
+
+ String property = prev.getProperties().getProperty(ConsumerRunningInfo.PROP_CONSUMER_START_TIMESTAMP);
+ if (property == null) {
+ property = String.valueOf(prev.getProperties().get(ConsumerRunningInfo.PROP_CONSUMER_START_TIMESTAMP));
+ }
+ startForAWhile = (System.currentTimeMillis() - Long.parseLong(property)) > (1000 * 60 * 2);
+ }
+
+ if (push && startForAWhile) {
+
+ {
+ Iterator<Entry<String, ConsumerRunningInfo>> it = criTable.entrySet().iterator();
+ while (it.hasNext()) {
+ Entry<String, ConsumerRunningInfo> next = it.next();
+ ConsumerRunningInfo current = next.getValue();
+ boolean equals = current.getSubscriptionSet().equals(prev.getSubscriptionSet());
+
+ if (!equals) {
+ // Different subscription in the same group of consumer
+ return false;
+ }
+
+ prev = next.getValue();
+ }
+
+ if (prev != null) {
+
+ if (prev.getSubscriptionSet().isEmpty()) {
+ // Subscription empty!
+ return false;
+ }
+ }
+ }
+ }
+
+ return true;
+ }
+
+ public Properties getProperties() {
+ return properties;
+ }
+
+ public void setProperties(Properties properties) {
+ this.properties = properties;
+ }
+
+ public TreeSet<SubscriptionData> getSubscriptionSet() {
+ return subscriptionSet;
+ }
+
+ public void setSubscriptionSet(TreeSet<SubscriptionData> subscriptionSet) {
+ this.subscriptionSet = subscriptionSet;
+ }
+
+ public static boolean analyzeRebalance(final TreeMap<String/* clientId */, ConsumerRunningInfo> criTable) {
+ return true;
+ }
+
+ public static String analyzeProcessQueue(final String clientId, ConsumerRunningInfo info) {
+ StringBuilder sb = new StringBuilder();
+ boolean push = false;
+ {
+ String property = info.getProperties().getProperty(ConsumerRunningInfo.PROP_CONSUME_TYPE);
+
+ if (property == null) {
+ property = ((ConsumeType) info.getProperties().get(ConsumerRunningInfo.PROP_CONSUME_TYPE)).name();
+ }
+ push = ConsumeType.valueOf(property) == ConsumeType.CONSUME_PASSIVELY;
+ }
+
+ boolean orderMsg = false;
+ {
+ String property = info.getProperties().getProperty(ConsumerRunningInfo.PROP_CONSUME_ORDERLY);
+ orderMsg = Boolean.parseBoolean(property);
+ }
+
+ if (push) {
+ Iterator<Entry<MessageQueue, ProcessQueueInfo>> it = info.getMqTable().entrySet().iterator();
+ while (it.hasNext()) {
+ Entry<MessageQueue, ProcessQueueInfo> next = it.next();
+ MessageQueue mq = next.getKey();
+ ProcessQueueInfo pq = next.getValue();
+
+
+ if (orderMsg) {
+
+ if (!pq.isLocked()) {
+ sb.append(String.format("%s %s can't lock for a while, %dms%n", //
+ clientId, //
+ mq, //
+ System.currentTimeMillis() - pq.getLastLockTimestamp()));
+ } else {
+ if (pq.isDroped() && (pq.getTryUnlockTimes() > 0)) {
+ sb.append(String.format("%s %s unlock %d times, still failed%n", //
+ clientId, //
+ mq, //
+ pq.getTryUnlockTimes()));
+ }
+ }
+
+
+ } else {
+ long diff = System.currentTimeMillis() - pq.getLastConsumeTimestamp();
+
+ if (diff > (1000 * 60) && pq.getCachedMsgCount() > 0) {
+ sb.append(String.format("%s %s can't consume for a while, maybe blocked, %dms%n", //
+ clientId, //
+ mq, //
+ diff));
+ }
+ }
+ }
+ }
+
+ return sb.toString();
+ }
+
+ public TreeMap<MessageQueue, ProcessQueueInfo> getMqTable() {
+ return mqTable;
+ }
+
+ public void setMqTable(TreeMap<MessageQueue, ProcessQueueInfo> mqTable) {
+ this.mqTable = mqTable;
+ }
+
+ public TreeMap<String, ConsumeStatus> getStatusTable() {
+ return statusTable;
+ }
+
+ public void setStatusTable(TreeMap<String, ConsumeStatus> statusTable) {
+ this.statusTable = statusTable;
+ }
+
+ public String formatString() {
+ StringBuilder sb = new StringBuilder();
+
+ {
+ sb.append("#Consumer Properties#\n");
+ Iterator<Entry<Object, Object>> it = this.properties.entrySet().iterator();
+ while (it.hasNext()) {
+ Entry<Object, Object> next = it.next();
+ String item = String.format("%-40s: %s%n", next.getKey().toString(), next.getValue().toString());
+ sb.append(item);
+ }
+ }
+
+ {
+ sb.append("\n\n#Consumer Subscription#\n");
+
+ Iterator<SubscriptionData> it = this.subscriptionSet.iterator();
+ int i = 0;
+ while (it.hasNext()) {
+ SubscriptionData next = it.next();
+ String item = String.format("%03d Topic: %-40s ClassFilter: %-8s SubExpression: %s%n", //
+ ++i, //
+ next.getTopic(), //
+ next.isClassFilterMode(), //
+ next.getSubString());
+
+ sb.append(item);
+ }
+ }
+
+ {
+ sb.append("\n\n#Consumer Offset#\n");
+ sb.append(String.format("%-32s %-32s %-4s %-20s%n", //
+ "#Topic", //
+ "#Broker Name", //
+ "#QID", //
+ "#Consumer Offset"//
+ ));
+
+ Iterator<Entry<MessageQueue, ProcessQueueInfo>> it = this.mqTable.entrySet().iterator();
+ while (it.hasNext()) {
+ Entry<MessageQueue, ProcessQueueInfo> next = it.next();
+ String item = String.format("%-32s %-32s %-4d %-20d%n", //
+ next.getKey().getTopic(), //
+ next.getKey().getBrokerName(), //
+ next.getKey().getQueueId(), //
+ next.getValue().getCommitOffset());
+
+ sb.append(item);
+ }
+ }
+
+ {
+ sb.append("\n\n#Consumer MQ Detail#\n");
+ sb.append(String.format("%-32s %-32s %-4s %-20s%n", //
+ "#Topic", //
+ "#Broker Name", //
+ "#QID", //
+ "#ProcessQueueInfo"//
+ ));
+
+ Iterator<Entry<MessageQueue, ProcessQueueInfo>> it = this.mqTable.entrySet().iterator();
+ while (it.hasNext()) {
+ Entry<MessageQueue, ProcessQueueInfo> next = it.next();
+ String item = String.format("%-32s %-32s %-4d %s%n", //
+ next.getKey().getTopic(), //
+ next.getKey().getBrokerName(), //
+ next.getKey().getQueueId(), //
+ next.getValue().toString());
+
+ sb.append(item);
+ }
+ }
+
+ {
+ sb.append("\n\n#Consumer RT&TPS#\n");
+ sb.append(String.format("%-32s %14s %14s %14s %14s %18s %25s%n", //
+ "#Topic", //
+ "#Pull RT", //
+ "#Pull TPS", //
+ "#Consume RT", //
+ "#ConsumeOK TPS", //
+ "#ConsumeFailed TPS", //
+ "#ConsumeFailedMsgsInHour"//
+ ));
+
+ Iterator<Entry<String, ConsumeStatus>> it = this.statusTable.entrySet().iterator();
+ while (it.hasNext()) {
+ Entry<String, ConsumeStatus> next = it.next();
+ String item = String.format("%-32s %14.2f %14.2f %14.2f %14.2f %18.2f %25d%n", //
+ next.getKey(), //
+ next.getValue().getPullRT(), //
+ next.getValue().getPullTPS(), //
+ next.getValue().getConsumeRT(), //
+ next.getValue().getConsumeOKTPS(), //
+ next.getValue().getConsumeFailedTPS(), //
+ next.getValue().getConsumeFailedMsgs()//
+ );
+
+ sb.append(item);
+ }
+ }
+
+ if (this.jstack != null) {
+ sb.append("\n\n#Consumer jstack#\n");
+ sb.append(this.jstack);
+ }
+
+ return sb.toString();
+ }
+
+ public String getJstack() {
+ return jstack;
+ }
+
+
+ public void setJstack(String jstack) {
+ this.jstack = jstack;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/protocol/body/GetConsumerStatusBody.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/GetConsumerStatusBody.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/GetConsumerStatusBody.java
new file mode 100644
index 0000000..71d8667
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/GetConsumerStatusBody.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.protocol.body;
+
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+
+import java.util.HashMap;
+import java.util.Map;
+
+
+/**
+ * @author manhong.yqd
+ */
+@Deprecated
+public class GetConsumerStatusBody extends RemotingSerializable {
+ private Map<MessageQueue, Long> messageQueueTable = new HashMap<MessageQueue, Long>();
+ private Map<String, Map<MessageQueue, Long>> consumerTable =
+ new HashMap<String, Map<MessageQueue, Long>>();
+
+
+ public Map<MessageQueue, Long> getMessageQueueTable() {
+ return messageQueueTable;
+ }
+
+
+ public void setMessageQueueTable(Map<MessageQueue, Long> messageQueueTable) {
+ this.messageQueueTable = messageQueueTable;
+ }
+
+
+ public Map<String, Map<MessageQueue, Long>> getConsumerTable() {
+ return consumerTable;
+ }
+
+
+ public void setConsumerTable(Map<String, Map<MessageQueue, Long>> consumerTable) {
+ this.consumerTable = consumerTable;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/protocol/body/GroupList.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/GroupList.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/GroupList.java
new file mode 100644
index 0000000..db7e071
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/GroupList.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.protocol.body;
+
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+
+import java.util.HashSet;
+
+
+/**
+ * @author shijia.wxr
+ *
+ */
+public class GroupList extends RemotingSerializable {
+ private HashSet<String> groupList = new HashSet<String>();
+
+
+ public HashSet<String> getGroupList() {
+ return groupList;
+ }
+
+
+ public void setGroupList(HashSet<String> groupList) {
+ this.groupList = groupList;
+ }
+}