You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2016/12/19 09:40:36 UTC
[19/43] incubator-rocketmq git commit: Finish code dump. Reviewed by:
@yukon @vongosling @stevenschew @vintagewang @lollipop @zander
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/heartbeat/SubscriptionData.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/heartbeat/SubscriptionData.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/heartbeat/SubscriptionData.java
new file mode 100644
index 0000000..b4b72fc
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/heartbeat/SubscriptionData.java
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * $Id: SubscriptionData.java 1835 2013-05-16 02:00:50Z shijia.wxr $
+ */
+package com.alibaba.rocketmq.common.protocol.heartbeat;
+
+import com.alibaba.fastjson.annotation.JSONField;
+
+import java.util.HashSet;
+import java.util.Set;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class SubscriptionData implements Comparable<SubscriptionData> {
+ public final static String SUB_ALL = "*";
+ private boolean classFilterMode = false;
+ private String topic;
+ private String subString;
+ private Set<String> tagsSet = new HashSet<String>();
+ private Set<Integer> codeSet = new HashSet<Integer>();
+ private long subVersion = System.currentTimeMillis();
+
+ @JSONField(serialize = false)
+ private String filterClassSource;
+
+
+ public SubscriptionData() {
+
+ }
+
+
+ public SubscriptionData(String topic, String subString) {
+ super();
+ this.topic = topic;
+ this.subString = subString;
+ }
+
+ public String getFilterClassSource() {
+ return filterClassSource;
+ }
+
+ public void setFilterClassSource(String filterClassSource) {
+ this.filterClassSource = filterClassSource;
+ }
+
+ public String getTopic() {
+ return topic;
+ }
+
+
+ public void setTopic(String topic) {
+ this.topic = topic;
+ }
+
+
+ public String getSubString() {
+ return subString;
+ }
+
+
+ public void setSubString(String subString) {
+ this.subString = subString;
+ }
+
+
+ public Set<String> getTagsSet() {
+ return tagsSet;
+ }
+
+
+ public void setTagsSet(Set<String> tagsSet) {
+ this.tagsSet = tagsSet;
+ }
+
+
+ public long getSubVersion() {
+ return subVersion;
+ }
+
+
+ public void setSubVersion(long subVersion) {
+ this.subVersion = subVersion;
+ }
+
+
+ public Set<Integer> getCodeSet() {
+ return codeSet;
+ }
+
+
+ public void setCodeSet(Set<Integer> codeSet) {
+ this.codeSet = codeSet;
+ }
+
+
+ public boolean isClassFilterMode() {
+ return classFilterMode;
+ }
+
+
+ public void setClassFilterMode(boolean classFilterMode) {
+ this.classFilterMode = classFilterMode;
+ }
+
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + (classFilterMode ? 1231 : 1237);
+ result = prime * result + ((codeSet == null) ? 0 : codeSet.hashCode());
+ result = prime * result + ((subString == null) ? 0 : subString.hashCode());
+ result = prime * result + ((tagsSet == null) ? 0 : tagsSet.hashCode());
+ result = prime * result + ((topic == null) ? 0 : topic.hashCode());
+ return result;
+ }
+
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ SubscriptionData other = (SubscriptionData) obj;
+ if (classFilterMode != other.classFilterMode)
+ return false;
+ if (codeSet == null) {
+ if (other.codeSet != null)
+ return false;
+ } else if (!codeSet.equals(other.codeSet))
+ return false;
+ if (subString == null) {
+ if (other.subString != null)
+ return false;
+ } else if (!subString.equals(other.subString))
+ return false;
+ if (subVersion != other.subVersion)
+ return false;
+ if (tagsSet == null) {
+ if (other.tagsSet != null)
+ return false;
+ } else if (!tagsSet.equals(other.tagsSet))
+ return false;
+ if (topic == null) {
+ if (other.topic != null)
+ return false;
+ } else if (!topic.equals(other.topic))
+ return false;
+ return true;
+ }
+
+
+ @Override
+ public String toString() {
+ return "SubscriptionData [classFilterMode=" + classFilterMode + ", topic=" + topic + ", subString="
+ + subString + ", tagsSet=" + tagsSet + ", codeSet=" + codeSet + ", subVersion=" + subVersion
+ + "]";
+ }
+
+
+ @Override
+ public int compareTo(SubscriptionData other) {
+ String thisValue = this.topic + "@" + this.subString;
+ String otherValue = other.topic + "@" + other.subString;
+ return thisValue.compareTo(otherValue);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/route/BrokerData.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/route/BrokerData.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/route/BrokerData.java
new file mode 100644
index 0000000..322953a
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/route/BrokerData.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * $Id: BrokerData.java 1835 2013-05-16 02:00:50Z shijia.wxr $
+ */
+package com.alibaba.rocketmq.common.protocol.route;
+
+import com.alibaba.rocketmq.common.MixAll;
+
+import java.util.HashMap;
+import java.util.Map;
+
+
+/**
+ * @author shijia.wxr
+ *
+ */
+public class BrokerData implements Comparable<BrokerData> {
+ private String cluster;
+ private String brokerName;
+ private HashMap<Long/* brokerId */, String/* broker address */> brokerAddrs;
+
+ public String selectBrokerAddr() {
+ String value = this.brokerAddrs.get(MixAll.MASTER_ID);
+ if (null == value) {
+ for (Map.Entry<Long, String> entry : this.brokerAddrs.entrySet()) {
+ return entry.getValue();
+ }
+ }
+
+ return value;
+ }
+
+ public HashMap<Long, String> getBrokerAddrs() {
+ return brokerAddrs;
+ }
+
+ public void setBrokerAddrs(HashMap<Long, String> brokerAddrs) {
+ this.brokerAddrs = brokerAddrs;
+ }
+
+ public String getCluster() {
+ return cluster;
+ }
+
+ public void setCluster(String cluster) {
+ this.cluster = cluster;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((brokerAddrs == null) ? 0 : brokerAddrs.hashCode());
+ result = prime * result + ((brokerName == null) ? 0 : brokerName.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ BrokerData other = (BrokerData) obj;
+ if (brokerAddrs == null) {
+ if (other.brokerAddrs != null)
+ return false;
+ } else if (!brokerAddrs.equals(other.brokerAddrs))
+ return false;
+ if (brokerName == null) {
+ if (other.brokerName != null)
+ return false;
+ } else if (!brokerName.equals(other.brokerName))
+ return false;
+ return true;
+ }
+
+ @Override
+ public String toString() {
+ return "BrokerData [brokerName=" + brokerName + ", brokerAddrs=" + brokerAddrs + "]";
+ }
+
+ @Override
+ public int compareTo(BrokerData o) {
+ return this.brokerName.compareTo(o.getBrokerName());
+ }
+
+ public String getBrokerName() {
+ return brokerName;
+ }
+
+ public void setBrokerName(String brokerName) {
+ this.brokerName = brokerName;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/route/QueueData.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/route/QueueData.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/route/QueueData.java
new file mode 100644
index 0000000..6f62340
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/route/QueueData.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * $Id: QueueData.java 1835 2013-05-16 02:00:50Z shijia.wxr $
+ */
+package com.alibaba.rocketmq.common.protocol.route;
+
+public class QueueData implements Comparable<QueueData> {
+ private String brokerName;
+ private int readQueueNums;
+ private int writeQueueNums;
+ private int perm;
+ private int topicSynFlag;
+
+ public int getReadQueueNums() {
+ return readQueueNums;
+ }
+
+ public void setReadQueueNums(int readQueueNums) {
+ this.readQueueNums = readQueueNums;
+ }
+
+ public int getWriteQueueNums() {
+ return writeQueueNums;
+ }
+
+ public void setWriteQueueNums(int writeQueueNums) {
+ this.writeQueueNums = writeQueueNums;
+ }
+
+ public int getPerm() {
+ return perm;
+ }
+
+ public void setPerm(int perm) {
+ this.perm = perm;
+ }
+
+ public int getTopicSynFlag() {
+ return topicSynFlag;
+ }
+
+ public void setTopicSynFlag(int topicSynFlag) {
+ this.topicSynFlag = topicSynFlag;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((brokerName == null) ? 0 : brokerName.hashCode());
+ result = prime * result + perm;
+ result = prime * result + readQueueNums;
+ result = prime * result + writeQueueNums;
+ result = prime * result + topicSynFlag;
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ QueueData other = (QueueData) obj;
+ if (brokerName == null) {
+ if (other.brokerName != null)
+ return false;
+ } else if (!brokerName.equals(other.brokerName))
+ return false;
+ if (perm != other.perm)
+ return false;
+ if (readQueueNums != other.readQueueNums)
+ return false;
+ if (writeQueueNums != other.writeQueueNums)
+ return false;
+ if (topicSynFlag != other.topicSynFlag)
+ return false;
+ return true;
+ }
+
+ @Override
+ public String toString() {
+ return "QueueData [brokerName=" + brokerName + ", readQueueNums=" + readQueueNums
+ + ", writeQueueNums=" + writeQueueNums + ", perm=" + perm + ", topicSynFlag=" + topicSynFlag
+ + "]";
+ }
+
+ @Override
+ public int compareTo(QueueData o) {
+ return this.brokerName.compareTo(o.getBrokerName());
+ }
+
+ public String getBrokerName() {
+ return brokerName;
+ }
+
+ public void setBrokerName(String brokerName) {
+ this.brokerName = brokerName;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/route/TopicRouteData.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/route/TopicRouteData.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/route/TopicRouteData.java
new file mode 100644
index 0000000..72e1b96
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/route/TopicRouteData.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * $Id: TopicRouteData.java 1835 2013-05-16 02:00:50Z shijia.wxr $
+ */
+package com.alibaba.rocketmq.common.protocol.route;
+
+import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class TopicRouteData extends RemotingSerializable {
+ private String orderTopicConf;
+ private List<QueueData> queueDatas;
+ private List<BrokerData> brokerDatas;
+ private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
+
+
+ public TopicRouteData cloneTopicRouteData() {
+ TopicRouteData topicRouteData = new TopicRouteData();
+ topicRouteData.setQueueDatas(new ArrayList<QueueData>());
+ topicRouteData.setBrokerDatas(new ArrayList<BrokerData>());
+ topicRouteData.setFilterServerTable(new HashMap<String, List<String>>());
+ topicRouteData.setOrderTopicConf(this.orderTopicConf);
+
+ if (this.queueDatas != null) {
+ topicRouteData.getQueueDatas().addAll(this.queueDatas);
+ }
+
+ if (this.brokerDatas != null) {
+ topicRouteData.getBrokerDatas().addAll(this.brokerDatas);
+ }
+
+ if (this.filterServerTable != null) {
+ topicRouteData.getFilterServerTable().putAll(this.filterServerTable);
+ }
+
+ return topicRouteData;
+ }
+
+
+ public List<QueueData> getQueueDatas() {
+ return queueDatas;
+ }
+
+
+ public void setQueueDatas(List<QueueData> queueDatas) {
+ this.queueDatas = queueDatas;
+ }
+
+
+ public List<BrokerData> getBrokerDatas() {
+ return brokerDatas;
+ }
+
+
+ public void setBrokerDatas(List<BrokerData> brokerDatas) {
+ this.brokerDatas = brokerDatas;
+ }
+
+ public HashMap<String, List<String>> getFilterServerTable() {
+ return filterServerTable;
+ }
+
+ public void setFilterServerTable(HashMap<String, List<String>> filterServerTable) {
+ this.filterServerTable = filterServerTable;
+ }
+
+ public String getOrderTopicConf() {
+ return orderTopicConf;
+ }
+
+ public void setOrderTopicConf(String orderTopicConf) {
+ this.orderTopicConf = orderTopicConf;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((brokerDatas == null) ? 0 : brokerDatas.hashCode());
+ result = prime * result + ((orderTopicConf == null) ? 0 : orderTopicConf.hashCode());
+ result = prime * result + ((queueDatas == null) ? 0 : queueDatas.hashCode());
+ result = prime * result + ((filterServerTable == null) ? 0 : filterServerTable.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ TopicRouteData other = (TopicRouteData) obj;
+ if (brokerDatas == null) {
+ if (other.brokerDatas != null)
+ return false;
+ } else if (!brokerDatas.equals(other.brokerDatas))
+ return false;
+ if (orderTopicConf == null) {
+ if (other.orderTopicConf != null)
+ return false;
+ } else if (!orderTopicConf.equals(other.orderTopicConf))
+ return false;
+ if (queueDatas == null) {
+ if (other.queueDatas != null)
+ return false;
+ } else if (!queueDatas.equals(other.queueDatas))
+ return false;
+ if (filterServerTable == null) {
+ if (other.filterServerTable != null)
+ return false;
+ } else if (!filterServerTable.equals(other.filterServerTable))
+ return false;
+ return true;
+ }
+
+ @Override
+ public String toString() {
+ return "TopicRouteData [orderTopicConf=" + orderTopicConf + ", queueDatas=" + queueDatas
+ + ", brokerDatas=" + brokerDatas + ", filterServerTable=" + filterServerTable + "]";
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/topic/OffsetMovedEvent.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/topic/OffsetMovedEvent.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/topic/OffsetMovedEvent.java
new file mode 100644
index 0000000..86bdd3d
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/topic/OffsetMovedEvent.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.rocketmq.common.protocol.topic;
+
+import com.alibaba.rocketmq.common.message.MessageQueue;
+import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable;
+
+
+public class OffsetMovedEvent extends RemotingSerializable {
+ private String consumerGroup;
+ private MessageQueue messageQueue;
+ private long offsetRequest;
+ private long offsetNew;
+
+
+ public String getConsumerGroup() {
+ return consumerGroup;
+ }
+
+
+ public void setConsumerGroup(String consumerGroup) {
+ this.consumerGroup = consumerGroup;
+ }
+
+
+ public MessageQueue getMessageQueue() {
+ return messageQueue;
+ }
+
+
+ public void setMessageQueue(MessageQueue messageQueue) {
+ this.messageQueue = messageQueue;
+ }
+
+
+ public long getOffsetRequest() {
+ return offsetRequest;
+ }
+
+
+ public void setOffsetRequest(long offsetRequest) {
+ this.offsetRequest = offsetRequest;
+ }
+
+
+ public long getOffsetNew() {
+ return offsetNew;
+ }
+
+
+ public void setOffsetNew(long offsetNew) {
+ this.offsetNew = offsetNew;
+ }
+
+
+ @Override
+ public String toString() {
+ return "OffsetMovedEvent [consumerGroup=" + consumerGroup + ", messageQueue=" + messageQueue
+ + ", offsetRequest=" + offsetRequest + ", offsetNew=" + offsetNew + "]";
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/queue/ConcurrentTreeMap.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/queue/ConcurrentTreeMap.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/queue/ConcurrentTreeMap.java
new file mode 100644
index 0000000..8fc4e76
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/queue/ConcurrentTreeMap.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.rocketmq.common.queue;
+
+import com.alibaba.rocketmq.common.constant.LoggerName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Comparator;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.locks.ReentrantLock;
+
+
+/**
+ * thread safe
+ *
+ * @author lansheng.zj
+ */
+public class ConcurrentTreeMap<K, V> {
+ private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+ private final ReentrantLock lock;
+ private TreeMap<K, V> tree;
+ private RoundQueue<K> roundQueue;
+
+
+ public ConcurrentTreeMap(int capacity, Comparator<? super K> comparator) {
+ tree = new TreeMap<K, V>(comparator);
+ roundQueue = new RoundQueue<K>(capacity);
+ lock = new ReentrantLock(true);
+ }
+
+
+ public Map.Entry<K, V> pollFirstEntry() {
+ lock.lock();
+ try {
+ return tree.pollFirstEntry();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+
+ public V putIfAbsentAndRetExsit(K key, V value) {
+ lock.lock();
+ try {
+ if (roundQueue.put(key)) {
+ V exsit = tree.get(key);
+ if (null == exsit) {
+ tree.put(key, value);
+ exsit = value;
+ }
+ log.warn("putIfAbsentAndRetExsit success. {}", key);
+ return exsit;
+ }
+
+ else {
+ V exsit = tree.get(key);
+ return exsit;
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/queue/RoundQueue.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/queue/RoundQueue.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/queue/RoundQueue.java
new file mode 100644
index 0000000..a3783ba
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/queue/RoundQueue.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.rocketmq.common.queue;
+
+import java.util.LinkedList;
+import java.util.Queue;
+
+
+/**
+ * not thread safe
+ *
+ * @author lansheng.zj
+ */
+public class RoundQueue<E> {
+
+ private Queue<E> queue;
+ private int capacity;
+
+
+ public RoundQueue(int capacity) {
+ this.capacity = capacity;
+ queue = new LinkedList<E>();
+ }
+
+
+ public boolean put(E e) {
+ boolean ok = false;
+ if (!queue.contains(e)) {
+ if (queue.size() >= capacity) {
+ queue.poll();
+ }
+ queue.add(e);
+ ok = true;
+ }
+
+ return ok;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/running/RunningStats.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/running/RunningStats.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/running/RunningStats.java
new file mode 100644
index 0000000..aa0bc54
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/running/RunningStats.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.common.running;
+
+public enum RunningStats {
+ commitLogMaxOffset,
+ commitLogMinOffset,
+ commitLogDiskRatio,
+ consumeQueueDiskRatio,
+ scheduleMessageOffset,
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/stats/MomentStatsItem.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/stats/MomentStatsItem.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/stats/MomentStatsItem.java
new file mode 100644
index 0000000..89eefa5
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/stats/MomentStatsItem.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.rocketmq.common.stats;
+
+import com.alibaba.rocketmq.common.UtilAll;
+import org.slf4j.Logger;
+
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+
+public class MomentStatsItem {
+
+ private final AtomicLong value = new AtomicLong(0);
+
+ private final String statsName;
+ private final String statsKey;
+ private final ScheduledExecutorService scheduledExecutorService;
+ private final Logger log;
+
+
+ public MomentStatsItem(String statsName, String statsKey,
+ ScheduledExecutorService scheduledExecutorService, Logger log) {
+ this.statsName = statsName;
+ this.statsKey = statsKey;
+ this.scheduledExecutorService = scheduledExecutorService;
+ this.log = log;
+ }
+
+
+ public void init() {
+ this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ printAtMinutes();
+
+ MomentStatsItem.this.value.set(0);
+ } catch (Throwable e) {
+ }
+ }
+ }, Math.abs(UtilAll.computNextMinutesTimeMillis() - System.currentTimeMillis()), 1000 * 60 * 5, TimeUnit.MILLISECONDS);
+ }
+
+
+ public void printAtMinutes() {
+ log.info(String.format("[%s] [%s] Stats Every 5 Minutes, Value: %d",
+ this.statsName,
+ this.statsKey,
+ this.value.get()));
+ }
+
+ public AtomicLong getValue() {
+ return value;
+ }
+
+
+ public String getStatsKey() {
+ return statsKey;
+ }
+
+
+ public String getStatsName() {
+ return statsName;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/stats/MomentStatsItemSet.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/stats/MomentStatsItemSet.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/stats/MomentStatsItemSet.java
new file mode 100644
index 0000000..fde88cd
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/stats/MomentStatsItemSet.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.rocketmq.common.stats;
+
+import com.alibaba.rocketmq.common.UtilAll;
+import org.slf4j.Logger;
+
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+
+public class MomentStatsItemSet {
+ private final ConcurrentHashMap<String/* key */, MomentStatsItem> statsItemTable =
+ new ConcurrentHashMap<String, MomentStatsItem>(128);
+ private final String statsName;
+ private final ScheduledExecutorService scheduledExecutorService;
+ private final Logger log;
+
+
+ public MomentStatsItemSet(String statsName, ScheduledExecutorService scheduledExecutorService, Logger log) {
+ this.statsName = statsName;
+ this.scheduledExecutorService = scheduledExecutorService;
+ this.log = log;
+ this.init();
+ }
+
+ public ConcurrentHashMap<String, MomentStatsItem> getStatsItemTable() {
+ return statsItemTable;
+ }
+
+ public String getStatsName() {
+ return statsName;
+ }
+
+ public void init() {
+
+ this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ printAtMinutes();
+ } catch (Throwable e) {
+ }
+ }
+ }, Math.abs(UtilAll.computNextMinutesTimeMillis() - System.currentTimeMillis()), 1000 * 60 * 5, TimeUnit.MILLISECONDS);
+ }
+
+ private void printAtMinutes() {
+ Iterator<Entry<String, MomentStatsItem>> it = this.statsItemTable.entrySet().iterator();
+ while (it.hasNext()) {
+ Entry<String, MomentStatsItem> next = it.next();
+ next.getValue().printAtMinutes();
+ }
+ }
+
+ public void setValue(final String statsKey, final int value) {
+ MomentStatsItem statsItem = this.getAndCreateStatsItem(statsKey);
+ statsItem.getValue().set(value);
+ }
+
+ public MomentStatsItem getAndCreateStatsItem(final String statsKey) {
+ MomentStatsItem statsItem = this.statsItemTable.get(statsKey);
+ if (null == statsItem) {
+ statsItem =
+ new MomentStatsItem(this.statsName, statsKey, this.scheduledExecutorService, this.log);
+ MomentStatsItem prev = this.statsItemTable.put(statsKey, statsItem);
+
+ if (null == prev) {
+
+ // statsItem.init();
+ }
+ }
+
+ return statsItem;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/stats/StatsItem.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/stats/StatsItem.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/stats/StatsItem.java
new file mode 100644
index 0000000..1c99699
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/stats/StatsItem.java
@@ -0,0 +1,272 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.rocketmq.common.stats;
+
+import com.alibaba.rocketmq.common.UtilAll;
+import org.slf4j.Logger;
+
+import java.util.LinkedList;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+
+public class StatsItem {
+
+ private final AtomicLong value = new AtomicLong(0);
+
+ private final AtomicLong times = new AtomicLong(0);
+
+ private final LinkedList<CallSnapshot> csListMinute = new LinkedList<CallSnapshot>();
+
+
+ private final LinkedList<CallSnapshot> csListHour = new LinkedList<CallSnapshot>();
+
+
+ private final LinkedList<CallSnapshot> csListDay = new LinkedList<CallSnapshot>();
+
+ private final String statsName;
+ private final String statsKey;
+ private final ScheduledExecutorService scheduledExecutorService;
+ private final Logger log;
+
+
+ public StatsItem(String statsName, String statsKey, ScheduledExecutorService scheduledExecutorService,
+ Logger log) {
+ this.statsName = statsName;
+ this.statsKey = statsKey;
+ this.scheduledExecutorService = scheduledExecutorService;
+ this.log = log;
+ }
+
+ public StatsSnapshot getStatsDataInMinute() {
+ return computeStatsData(this.csListMinute);
+ }
+
+ private static StatsSnapshot computeStatsData(final LinkedList<CallSnapshot> csList) {
+ StatsSnapshot statsSnapshot = new StatsSnapshot();
+ synchronized (csList) {
+ double tps = 0;
+ double avgpt = 0;
+ long sum = 0;
+ if (!csList.isEmpty()) {
+ CallSnapshot first = csList.getFirst();
+ CallSnapshot last = csList.getLast();
+ sum = last.getValue() - first.getValue();
+ tps = (sum * 1000.0d) / (last.getTimestamp() - first.getTimestamp());
+
+ long timesDiff = last.getTimes() - first.getTimes();
+ if (timesDiff > 0) {
+ avgpt = (sum * 1.0d) / timesDiff;
+ }
+ }
+
+ statsSnapshot.setSum(sum);
+ statsSnapshot.setTps(tps);
+ statsSnapshot.setAvgpt(avgpt);
+ }
+
+ return statsSnapshot;
+ }
+
+ public StatsSnapshot getStatsDataInHour() {
+ return computeStatsData(this.csListHour);
+ }
+
+ public StatsSnapshot getStatsDataInDay() {
+ return computeStatsData(this.csListDay);
+ }
+
+ public void init() {
+
+ this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ samplingInSeconds();
+ } catch (Throwable e) {
+ }
+ }
+ }, 0, 10, TimeUnit.SECONDS);
+
+
+ this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ samplingInMinutes();
+ } catch (Throwable e) {
+ }
+ }
+ }, 0, 10, TimeUnit.MINUTES);
+
+
+ this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ samplingInHour();
+ } catch (Throwable e) {
+ }
+ }
+ }, 0, 1, TimeUnit.HOURS);
+
+
+ this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ printAtMinutes();
+ } catch (Throwable ignored) {
+ }
+ }
+ }, Math.abs(UtilAll.computNextMinutesTimeMillis() - System.currentTimeMillis()), 1000 * 60, TimeUnit.MILLISECONDS);
+
+
+ this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ printAtHour();
+ } catch (Throwable ignored) {
+ }
+ }
+ }, Math.abs(UtilAll.computNextHourTimeMillis() - System.currentTimeMillis()), 1000 * 60 * 60, TimeUnit.MILLISECONDS);
+
+
+ this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ printAtDay();
+ } catch (Throwable ignored) {
+ }
+ }
+ }, Math.abs(UtilAll.computNextMorningTimeMillis() - System.currentTimeMillis()) - 2000, 1000 * 60 * 60 * 24, TimeUnit.MILLISECONDS);
+ }
+
+ public void samplingInSeconds() {
+ synchronized (this.csListMinute) {
+ this.csListMinute.add(new CallSnapshot(System.currentTimeMillis(), this.times.get(), this.value
+ .get()));
+ if (this.csListMinute.size() > 7) {
+ this.csListMinute.removeFirst();
+ }
+ }
+ }
+
+ public void samplingInMinutes() {
+ synchronized (this.csListHour) {
+ this.csListHour.add(new CallSnapshot(System.currentTimeMillis(), this.times.get(), this.value
+ .get()));
+ if (this.csListHour.size() > 7) {
+ this.csListHour.removeFirst();
+ }
+ }
+ }
+
+ public void samplingInHour() {
+ synchronized (this.csListDay) {
+ this.csListDay.add(new CallSnapshot(System.currentTimeMillis(), this.times.get(), this.value
+ .get()));
+ if (this.csListDay.size() > 25) {
+ this.csListDay.removeFirst();
+ }
+ }
+ }
+
+ public void printAtMinutes() {
+ StatsSnapshot ss = computeStatsData(this.csListMinute);
+ log.info(String.format("[%s] [%s] Stats In One Minute, SUM: %d TPS: %.2f AVGPT: %.2f",
+ this.statsName,
+ this.statsKey,
+ ss.getSum(),
+ ss.getTps(),
+ ss.getAvgpt()));
+ }
+
+ public void printAtHour() {
+ StatsSnapshot ss = computeStatsData(this.csListHour);
+ log.info(String.format("[%s] [%s] Stats In One Hour, SUM: %d TPS: %.2f AVGPT: %.2f",
+ this.statsName,
+ this.statsKey,
+ ss.getSum(),
+ ss.getTps(),
+ ss.getAvgpt()));
+ }
+
+ public void printAtDay() {
+ StatsSnapshot ss = computeStatsData(this.csListDay);
+ log.info(String.format("[%s] [%s] Stats In One Day, SUM: %d TPS: %.2f AVGPT: %.2f",
+ this.statsName,
+ this.statsKey,
+ ss.getSum(),
+ ss.getTps(),
+ ss.getAvgpt()));
+ }
+
+ public AtomicLong getValue() {
+ return value;
+ }
+
+
+ public String getStatsKey() {
+ return statsKey;
+ }
+
+
+ public String getStatsName() {
+ return statsName;
+ }
+
+
+ public AtomicLong getTimes() {
+ return times;
+ }
+}
+
+
+class CallSnapshot {
+ private final long timestamp;
+ private final long times;
+
+ private final long value;
+
+
+ public CallSnapshot(long timestamp, long times, long value) {
+ super();
+ this.timestamp = timestamp;
+ this.times = times;
+ this.value = value;
+ }
+
+
+ public long getTimestamp() {
+ return timestamp;
+ }
+
+
+ public long getTimes() {
+ return times;
+ }
+
+
+ public long getValue() {
+ return value;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/stats/StatsItemSet.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/stats/StatsItemSet.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/stats/StatsItemSet.java
new file mode 100644
index 0000000..8a2b2a1
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/stats/StatsItemSet.java
@@ -0,0 +1,210 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.rocketmq.common.stats;
+
+import com.alibaba.rocketmq.common.UtilAll;
+import org.slf4j.Logger;
+
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+
+public class StatsItemSet {
+ private final ConcurrentHashMap<String/* key */, StatsItem> statsItemTable =
+ new ConcurrentHashMap<String, StatsItem>(128);
+
+ private final String statsName;
+ private final ScheduledExecutorService scheduledExecutorService;
+ private final Logger log;
+
+
+ public StatsItemSet(String statsName, ScheduledExecutorService scheduledExecutorService, Logger log) {
+ this.statsName = statsName;
+ this.scheduledExecutorService = scheduledExecutorService;
+ this.log = log;
+ this.init();
+ }
+
+ public void init() {
+
+ this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ samplingInSeconds();
+ } catch (Throwable e) {
+ }
+ }
+ }, 0, 10, TimeUnit.SECONDS);
+
+
+ this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ samplingInMinutes();
+ } catch (Throwable e) {
+ }
+ }
+ }, 0, 10, TimeUnit.MINUTES);
+
+
+ this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ samplingInHour();
+ } catch (Throwable e) {
+ }
+ }
+ }, 0, 1, TimeUnit.HOURS);
+
+
+ this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ printAtMinutes();
+ } catch (Throwable e) {
+ }
+ }
+ }, Math.abs(UtilAll.computNextMinutesTimeMillis() - System.currentTimeMillis()), 1000 * 60, TimeUnit.MILLISECONDS);
+
+
+ this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ printAtHour();
+ } catch (Throwable e) {
+ }
+ }
+ }, Math.abs(UtilAll.computNextHourTimeMillis() - System.currentTimeMillis()), 1000 * 60 * 60, TimeUnit.MILLISECONDS);
+
+
+ this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ printAtDay();
+ } catch (Throwable e) {
+ }
+ }
+ }, Math.abs(UtilAll.computNextMorningTimeMillis() - System.currentTimeMillis()), 1000 * 60 * 60 * 24, TimeUnit.MILLISECONDS);
+ }
+
+ private void samplingInSeconds() {
+ Iterator<Entry<String, StatsItem>> it = this.statsItemTable.entrySet().iterator();
+ while (it.hasNext()) {
+ Entry<String, StatsItem> next = it.next();
+ next.getValue().samplingInSeconds();
+ }
+ }
+
+ private void samplingInMinutes() {
+ Iterator<Entry<String, StatsItem>> it = this.statsItemTable.entrySet().iterator();
+ while (it.hasNext()) {
+ Entry<String, StatsItem> next = it.next();
+ next.getValue().samplingInMinutes();
+ }
+ }
+
+ private void samplingInHour() {
+ Iterator<Entry<String, StatsItem>> it = this.statsItemTable.entrySet().iterator();
+ while (it.hasNext()) {
+ Entry<String, StatsItem> next = it.next();
+ next.getValue().samplingInHour();
+ }
+ }
+
+ private void printAtMinutes() {
+ Iterator<Entry<String, StatsItem>> it = this.statsItemTable.entrySet().iterator();
+ while (it.hasNext()) {
+ Entry<String, StatsItem> next = it.next();
+ next.getValue().printAtMinutes();
+ }
+ }
+
+ private void printAtHour() {
+ Iterator<Entry<String, StatsItem>> it = this.statsItemTable.entrySet().iterator();
+ while (it.hasNext()) {
+ Entry<String, StatsItem> next = it.next();
+ next.getValue().printAtHour();
+ }
+ }
+
+ private void printAtDay() {
+ Iterator<Entry<String, StatsItem>> it = this.statsItemTable.entrySet().iterator();
+ while (it.hasNext()) {
+ Entry<String, StatsItem> next = it.next();
+ next.getValue().printAtDay();
+ }
+ }
+
+ public void addValue(final String statsKey, final int incValue, final int incTimes) {
+ StatsItem statsItem = this.getAndCreateStatsItem(statsKey);
+ statsItem.getValue().addAndGet(incValue);
+ statsItem.getTimes().addAndGet(incTimes);
+ }
+
+ public StatsItem getAndCreateStatsItem(final String statsKey) {
+ StatsItem statsItem = this.statsItemTable.get(statsKey);
+ if (null == statsItem) {
+ statsItem = new StatsItem(this.statsName, statsKey, this.scheduledExecutorService, this.log);
+ StatsItem prev = this.statsItemTable.put(statsKey, statsItem);
+
+ if (null == prev) {
+
+ // statsItem.init();
+ }
+ }
+
+ return statsItem;
+ }
+
+ public StatsSnapshot getStatsDataInMinute(final String statsKey) {
+ StatsItem statsItem = this.statsItemTable.get(statsKey);
+ if (null != statsItem) {
+ return statsItem.getStatsDataInMinute();
+ }
+ return new StatsSnapshot();
+ }
+
+ public StatsSnapshot getStatsDataInHour(final String statsKey) {
+ StatsItem statsItem = this.statsItemTable.get(statsKey);
+ if (null != statsItem) {
+ return statsItem.getStatsDataInHour();
+ }
+ return new StatsSnapshot();
+ }
+
+ public StatsSnapshot getStatsDataInDay(final String statsKey) {
+ StatsItem statsItem = this.statsItemTable.get(statsKey);
+ if (null != statsItem) {
+ return statsItem.getStatsDataInDay();
+ }
+ return new StatsSnapshot();
+ }
+
+ public StatsItem getStatsItem(final String statsKey) {
+ return this.statsItemTable.get(statsKey);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/stats/StatsSnapshot.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/stats/StatsSnapshot.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/stats/StatsSnapshot.java
new file mode 100644
index 0000000..4092a2b
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/stats/StatsSnapshot.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.rocketmq.common.stats;
+
+public class StatsSnapshot {
+ private long sum;
+ private double tps;
+ private double avgpt;
+
+
+ public long getSum() {
+ return sum;
+ }
+
+
+ public void setSum(long sum) {
+ this.sum = sum;
+ }
+
+
+ public double getTps() {
+ return tps;
+ }
+
+
+ public void setTps(double tps) {
+ this.tps = tps;
+ }
+
+
+ public double getAvgpt() {
+ return avgpt;
+ }
+
+
+ public void setAvgpt(double avgpt) {
+ this.avgpt = avgpt;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/subscription/SubscriptionGroupConfig.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/subscription/SubscriptionGroupConfig.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/subscription/SubscriptionGroupConfig.java
new file mode 100644
index 0000000..cf8baf2
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/subscription/SubscriptionGroupConfig.java
@@ -0,0 +1,194 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.rocketmq.common.subscription;
+
+import com.alibaba.rocketmq.common.MixAll;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class SubscriptionGroupConfig {
+
+ private String groupName;
+
+ private boolean consumeEnable = true;
+ private boolean consumeFromMinEnable = true;
+
+ private boolean consumeBroadcastEnable = true;
+
+ private int retryQueueNums = 1;
+
+ private int retryMaxTimes = 16;
+
+ private long brokerId = MixAll.MASTER_ID;
+
+ private long whichBrokerWhenConsumeSlowly = 1;
+
+ private boolean notifyConsumerIdsChangedEnable = true;
+
+
+ public String getGroupName() {
+ return groupName;
+ }
+
+
+ public void setGroupName(String groupName) {
+ this.groupName = groupName;
+ }
+
+
+ public boolean isConsumeEnable() {
+ return consumeEnable;
+ }
+
+
+ public void setConsumeEnable(boolean consumeEnable) {
+ this.consumeEnable = consumeEnable;
+ }
+
+
+ public boolean isConsumeFromMinEnable() {
+ return consumeFromMinEnable;
+ }
+
+
+ public void setConsumeFromMinEnable(boolean consumeFromMinEnable) {
+ this.consumeFromMinEnable = consumeFromMinEnable;
+ }
+
+
+ public boolean isConsumeBroadcastEnable() {
+ return consumeBroadcastEnable;
+ }
+
+
+ public void setConsumeBroadcastEnable(boolean consumeBroadcastEnable) {
+ this.consumeBroadcastEnable = consumeBroadcastEnable;
+ }
+
+
+ public int getRetryQueueNums() {
+ return retryQueueNums;
+ }
+
+
+ public void setRetryQueueNums(int retryQueueNums) {
+ this.retryQueueNums = retryQueueNums;
+ }
+
+
+ public int getRetryMaxTimes() {
+ return retryMaxTimes;
+ }
+
+
+ public void setRetryMaxTimes(int retryMaxTimes) {
+ this.retryMaxTimes = retryMaxTimes;
+ }
+
+
+ public long getBrokerId() {
+ return brokerId;
+ }
+
+
+ public void setBrokerId(long brokerId) {
+ this.brokerId = brokerId;
+ }
+
+
+ public long getWhichBrokerWhenConsumeSlowly() {
+ return whichBrokerWhenConsumeSlowly;
+ }
+
+
+ public void setWhichBrokerWhenConsumeSlowly(long whichBrokerWhenConsumeSlowly) {
+ this.whichBrokerWhenConsumeSlowly = whichBrokerWhenConsumeSlowly;
+ }
+
+ public boolean isNotifyConsumerIdsChangedEnable() {
+ return notifyConsumerIdsChangedEnable;
+ }
+
+ public void setNotifyConsumerIdsChangedEnable(final boolean notifyConsumerIdsChangedEnable) {
+ this.notifyConsumerIdsChangedEnable = notifyConsumerIdsChangedEnable;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + (int) (brokerId ^ (brokerId >>> 32));
+ result = prime * result + (consumeBroadcastEnable ? 1231 : 1237);
+ result = prime * result + (consumeEnable ? 1231 : 1237);
+ result = prime * result + (consumeFromMinEnable ? 1231 : 1237);
+ result = prime * result + (notifyConsumerIdsChangedEnable ? 1231 : 1237);
+ result = prime * result + ((groupName == null) ? 0 : groupName.hashCode());
+ result = prime * result + retryMaxTimes;
+ result = prime * result + retryQueueNums;
+ result =
+ prime * result + (int) (whichBrokerWhenConsumeSlowly ^ (whichBrokerWhenConsumeSlowly >>> 32));
+ return result;
+ }
+
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ SubscriptionGroupConfig other = (SubscriptionGroupConfig) obj;
+ if (brokerId != other.brokerId)
+ return false;
+ if (consumeBroadcastEnable != other.consumeBroadcastEnable)
+ return false;
+ if (consumeEnable != other.consumeEnable)
+ return false;
+ if (consumeFromMinEnable != other.consumeFromMinEnable)
+ return false;
+ if (groupName == null) {
+ if (other.groupName != null)
+ return false;
+ } else if (!groupName.equals(other.groupName))
+ return false;
+ if (retryMaxTimes != other.retryMaxTimes)
+ return false;
+ if (retryQueueNums != other.retryQueueNums)
+ return false;
+ if (whichBrokerWhenConsumeSlowly != other.whichBrokerWhenConsumeSlowly)
+ return false;
+ if (notifyConsumerIdsChangedEnable != other.notifyConsumerIdsChangedEnable)
+ return false;
+ return true;
+ }
+
+
+ @Override
+ public String toString() {
+ return "SubscriptionGroupConfig [groupName=" + groupName + ", consumeEnable=" + consumeEnable
+ + ", consumeFromMinEnable=" + consumeFromMinEnable + ", consumeBroadcastEnable="
+ + consumeBroadcastEnable + ", retryQueueNums=" + retryQueueNums + ", retryMaxTimes="
+ + retryMaxTimes + ", brokerId=" + brokerId + ", whichBrokerWhenConsumeSlowly="
+ + whichBrokerWhenConsumeSlowly + ", notifyConsumerIdsChangedEnable="
+ + notifyConsumerIdsChangedEnable + "]";
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/sysflag/MessageSysFlag.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/sysflag/MessageSysFlag.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/sysflag/MessageSysFlag.java
new file mode 100644
index 0000000..2f9d057
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/sysflag/MessageSysFlag.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.common.sysflag;
+
+/**
+ * @author shijia.wxr
+ */
+public class MessageSysFlag {
+ public final static int COMPRESSED_FLAG = 0x1;
+ public final static int MULTI_TAGS_FLAG = 0x1 << 1;
+ public final static int TRANSACTION_NOT_TYPE = 0;
+ public final static int TRANSACTION_PREPARED_TYPE = 0x1 << 2;
+ public final static int TRANSACTION_COMMIT_TYPE = 0x2 << 2;
+ public final static int TRANSACTION_ROLLBACK_TYPE = 0x3 << 2;
+
+
+ public static int getTransactionValue(final int flag) {
+ return flag & TRANSACTION_ROLLBACK_TYPE;
+ }
+
+
+ public static int resetTransactionValue(final int flag, final int type) {
+ return (flag & (~TRANSACTION_ROLLBACK_TYPE)) | type;
+ }
+
+
+ public static int clearCompressedFlag(final int flag) {
+ return flag & (~COMPRESSED_FLAG);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/sysflag/PullSysFlag.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/sysflag/PullSysFlag.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/sysflag/PullSysFlag.java
new file mode 100644
index 0000000..d0f7287
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/sysflag/PullSysFlag.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.common.sysflag;
+
+/**
+ * @author shijia.wxr
+ */
+public class PullSysFlag {
+ private final static int FLAG_COMMIT_OFFSET = 0x1 << 0;
+ private final static int FLAG_SUSPEND = 0x1 << 1;
+ private final static int FLAG_SUBSCRIPTION = 0x1 << 2;
+ private final static int FLAG_CLASS_FILTER = 0x1 << 3;
+
+
+ public static int buildSysFlag(final boolean commitOffset, final boolean suspend,
+ final boolean subscription, final boolean classFilter) {
+ int flag = 0;
+
+ if (commitOffset) {
+ flag |= FLAG_COMMIT_OFFSET;
+ }
+
+ if (suspend) {
+ flag |= FLAG_SUSPEND;
+ }
+
+ if (subscription) {
+ flag |= FLAG_SUBSCRIPTION;
+ }
+
+ if (classFilter) {
+ flag |= FLAG_CLASS_FILTER;
+ }
+
+ return flag;
+ }
+
+
+ public static int clearCommitOffsetFlag(final int sysFlag) {
+ return sysFlag & (~FLAG_COMMIT_OFFSET);
+ }
+
+
+ public static boolean hasCommitOffsetFlag(final int sysFlag) {
+ return (sysFlag & FLAG_COMMIT_OFFSET) == FLAG_COMMIT_OFFSET;
+ }
+
+
+ public static boolean hasSuspendFlag(final int sysFlag) {
+ return (sysFlag & FLAG_SUSPEND) == FLAG_SUSPEND;
+ }
+
+
+ public static boolean hasSubscriptionFlag(final int sysFlag) {
+ return (sysFlag & FLAG_SUBSCRIPTION) == FLAG_SUBSCRIPTION;
+ }
+
+
+ public static boolean hasClassFilterFlag(final int sysFlag) {
+ return (sysFlag & FLAG_CLASS_FILTER) == FLAG_CLASS_FILTER;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/sysflag/SubscriptionSysFlag.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/sysflag/SubscriptionSysFlag.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/sysflag/SubscriptionSysFlag.java
new file mode 100644
index 0000000..65e3115
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/sysflag/SubscriptionSysFlag.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.common.sysflag;
+
+/**
+ * @author manhong.yqd
+ */
+public class SubscriptionSysFlag {
+
+ private final static int FLAG_UNIT = 0x1 << 0;
+
+
+ public static int buildSysFlag(final boolean unit) {
+ int sysFlag = 0;
+
+ if (unit) {
+ sysFlag |= FLAG_UNIT;
+ }
+
+ return sysFlag;
+ }
+
+
+ public static int setUnitFlag(final int sysFlag) {
+ return sysFlag | FLAG_UNIT;
+ }
+
+
+ public static int clearUnitFlag(final int sysFlag) {
+ return sysFlag & (~FLAG_UNIT);
+ }
+
+
+ public static boolean hasUnitFlag(final int sysFlag) {
+ return (sysFlag & FLAG_UNIT) == FLAG_UNIT;
+ }
+
+
+ public static void main(String[] args) {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/sysflag/TopicSysFlag.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/sysflag/TopicSysFlag.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/sysflag/TopicSysFlag.java
new file mode 100644
index 0000000..90d48f4
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/sysflag/TopicSysFlag.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.common.sysflag;
+
+/**
+
+ *
+ * @author manhong.yqd
+ *
+ */
+public class TopicSysFlag {
+
+ private final static int FLAG_UNIT = 0x1 << 0;
+
+ private final static int FLAG_UNIT_SUB = 0x1 << 1;
+
+
+ public static int buildSysFlag(final boolean unit, final boolean hasUnitSub) {
+ int sysFlag = 0;
+
+ if (unit) {
+ sysFlag |= FLAG_UNIT;
+ }
+
+ if (hasUnitSub) {
+ sysFlag |= FLAG_UNIT_SUB;
+ }
+
+ return sysFlag;
+ }
+
+
+ public static int setUnitFlag(final int sysFlag) {
+ return sysFlag | FLAG_UNIT;
+ }
+
+
+ public static int clearUnitFlag(final int sysFlag) {
+ return sysFlag & (~FLAG_UNIT);
+ }
+
+
+ public static boolean hasUnitFlag(final int sysFlag) {
+ return (sysFlag & FLAG_UNIT) == FLAG_UNIT;
+ }
+
+
+ public static int setUnitSubFlag(final int sysFlag) {
+ return sysFlag | FLAG_UNIT_SUB;
+ }
+
+
+ public static int clearUnitSubFlag(final int sysFlag) {
+ return sysFlag & (~FLAG_UNIT_SUB);
+ }
+
+
+ public static boolean hasUnitSubFlag(final int sysFlag) {
+ return (sysFlag & FLAG_UNIT_SUB) == FLAG_UNIT_SUB;
+ }
+
+
+ public static void main(String[] args) {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/utils/ChannelUtil.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/utils/ChannelUtil.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/utils/ChannelUtil.java
new file mode 100644
index 0000000..444928f
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/utils/ChannelUtil.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.rocketmq.common.utils;
+
+import io.netty.channel.Channel;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+
+public class ChannelUtil {
+ public static String getRemoteIp(Channel channel) {
+ InetSocketAddress inetSocketAddress = (InetSocketAddress) channel.remoteAddress();
+ if (inetSocketAddress == null) {
+ return "";
+ }
+ final InetAddress inetAddr = inetSocketAddress.getAddress();
+ return inetAddr != null ? inetAddr.getHostAddress() : inetSocketAddress.getHostName();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/utils/HttpTinyClient.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/utils/HttpTinyClient.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/utils/HttpTinyClient.java
new file mode 100755
index 0000000..dadac46
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/utils/HttpTinyClient.java
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.rocketmq.common.utils;
+
+import com.alibaba.rocketmq.common.MQVersion;
+import com.alibaba.rocketmq.common.MixAll;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.net.URLEncoder;
+import java.util.Iterator;
+import java.util.List;
+
+
+public class HttpTinyClient {
+
+ static public HttpResult httpGet(String url, List<String> headers, List<String> paramValues,
+ String encoding, long readTimeoutMs) throws IOException {
+ String encodedContent = encodingParams(paramValues, encoding);
+ url += (null == encodedContent) ? "" : ("?" + encodedContent);
+
+ HttpURLConnection conn = null;
+ try {
+ conn = (HttpURLConnection) new URL(url).openConnection();
+ conn.setRequestMethod("GET");
+ conn.setConnectTimeout((int) readTimeoutMs);
+ conn.setReadTimeout((int) readTimeoutMs);
+ setHeaders(conn, headers, encoding);
+
+ conn.connect();
+ int respCode = conn.getResponseCode();
+ String resp = null;
+
+ if (HttpURLConnection.HTTP_OK == respCode) {
+ resp = IOTinyUtils.toString(conn.getInputStream(), encoding);
+ } else {
+ resp = IOTinyUtils.toString(conn.getErrorStream(), encoding);
+ }
+ return new HttpResult(respCode, resp);
+ } finally {
+ if (conn != null) {
+ conn.disconnect();
+ }
+ }
+ }
+
+ static private String encodingParams(List<String> paramValues, String encoding)
+ throws UnsupportedEncodingException {
+ StringBuilder sb = new StringBuilder();
+ if (null == paramValues) {
+ return null;
+ }
+
+ for (Iterator<String> iter = paramValues.iterator(); iter.hasNext(); ) {
+ sb.append(iter.next()).append("=");
+ sb.append(URLEncoder.encode(iter.next(), encoding));
+ if (iter.hasNext()) {
+ sb.append("&");
+ }
+ }
+ return sb.toString();
+ }
+
+ static private void setHeaders(HttpURLConnection conn, List<String> headers, String encoding) {
+ if (null != headers) {
+ for (Iterator<String> iter = headers.iterator(); iter.hasNext(); ) {
+ conn.addRequestProperty(iter.next(), iter.next());
+ }
+ }
+ conn.addRequestProperty("Client-Version", MQVersion.getVersionDesc(MQVersion.CURRENT_VERSION));
+ conn.addRequestProperty("Content-Type", "application/x-www-form-urlencoded;charset=" + encoding);
+
+
+ String ts = String.valueOf(System.currentTimeMillis());
+ conn.addRequestProperty("Metaq-Client-RequestTS", ts);
+ }
+
+ /**
+
+ *
+ * @param url
+ * @param headers
+
+ * @param paramValues
+
+ * @param encoding
+
+ * @param readTimeoutMs
+
+ *
+ * @return the http response of given http post request
+ *
+ * @throws java.io.IOException
+ */
+ static public HttpResult httpPost(String url, List<String> headers, List<String> paramValues,
+ String encoding, long readTimeoutMs) throws IOException {
+ String encodedContent = encodingParams(paramValues, encoding);
+
+ HttpURLConnection conn = null;
+ try {
+ conn = (HttpURLConnection) new URL(url).openConnection();
+ conn.setRequestMethod("POST");
+ conn.setConnectTimeout(3000);
+ conn.setReadTimeout((int) readTimeoutMs);
+ conn.setDoOutput(true);
+ conn.setDoInput(true);
+ setHeaders(conn, headers, encoding);
+
+ conn.getOutputStream().write(encodedContent.getBytes(MixAll.DEFAULT_CHARSET));
+
+ int respCode = conn.getResponseCode();
+ String resp = null;
+
+ if (HttpURLConnection.HTTP_OK == respCode) {
+ resp = IOTinyUtils.toString(conn.getInputStream(), encoding);
+ } else {
+ resp = IOTinyUtils.toString(conn.getErrorStream(), encoding);
+ }
+ return new HttpResult(respCode, resp);
+ } finally {
+ if (null != conn) {
+ conn.disconnect();
+ }
+ }
+ }
+
+ static public class HttpResult {
+ final public int code;
+ final public String content;
+
+
+ public HttpResult(int code, String content) {
+ this.code = code;
+ this.content = content;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/utils/IOTinyUtils.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/utils/IOTinyUtils.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/utils/IOTinyUtils.java
new file mode 100644
index 0000000..ced2fae
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/utils/IOTinyUtils.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.rocketmq.common.utils;
+
+import com.alibaba.rocketmq.remoting.common.RemotingHelper;
+
+import java.io.*;
+import java.nio.channels.FileChannel;
+import java.util.ArrayList;
+import java.util.List;
+
+
+/**
+ * @author manhong.yqd
+ */
+public class IOTinyUtils {
+
+ static public String toString(InputStream input, String encoding) throws IOException {
+ return (null == encoding) ? toString(new InputStreamReader(input, RemotingHelper.DEFAULT_CHARSET)) : toString(new InputStreamReader(
+ input, encoding));
+ }
+
+
+ static public String toString(Reader reader) throws IOException {
+ CharArrayWriter sw = new CharArrayWriter();
+ copy(reader, sw);
+ return sw.toString();
+ }
+
+
+ static public long copy(Reader input, Writer output) throws IOException {
+ char[] buffer = new char[1 << 12];
+ long count = 0;
+ for (int n = 0; (n = input.read(buffer)) >= 0; ) {
+ output.write(buffer, 0, n);
+ count += n;
+ }
+ return count;
+ }
+
+
+ /**
+
+ */
+ static public List<String> readLines(Reader input) throws IOException {
+ BufferedReader reader = toBufferedReader(input);
+ List<String> list = new ArrayList<String>();
+ String line = null;
+ for (;;) {
+ line = reader.readLine();
+ if (null != line) {
+ list.add(line);
+ } else {
+ break;
+ }
+ }
+ return list;
+ }
+
+
+ static private BufferedReader toBufferedReader(Reader reader) {
+ return reader instanceof BufferedReader ? (BufferedReader) reader : new BufferedReader(reader);
+ }
+
+
+ static public void copyFile(String source, String target) throws IOException {
+ File sf = new File(source);
+ if (!sf.exists()) {
+ throw new IllegalArgumentException("source file does not exist.");
+ }
+ File tf = new File(target);
+ tf.getParentFile().mkdirs();
+ if (!tf.exists() && !tf.createNewFile()) {
+ throw new RuntimeException("failed to create target file.");
+ }
+
+ FileChannel sc = null;
+ FileChannel tc = null;
+ try {
+ tc = new FileOutputStream(tf).getChannel();
+ sc = new FileInputStream(sf).getChannel();
+ sc.transferTo(0, sc.size(), tc);
+ } finally {
+ if (null != sc) {
+ sc.close();
+ }
+ if (null != tc) {
+ tc.close();
+ }
+ }
+ }
+
+
+ public static void delete(File fileOrDir) throws IOException {
+ if (fileOrDir == null) {
+ return;
+ }
+
+ if (fileOrDir.isDirectory()) {
+ cleanDirectory(fileOrDir);
+ }
+
+ fileOrDir.delete();
+ }
+
+
+ /**
+
+ */
+ public static void cleanDirectory(File directory) throws IOException {
+ if (!directory.exists()) {
+ String message = directory + " does not exist";
+ throw new IllegalArgumentException(message);
+ }
+
+ if (!directory.isDirectory()) {
+ String message = directory + " is not a directory";
+ throw new IllegalArgumentException(message);
+ }
+
+ File[] files = directory.listFiles();
+ if (files == null) { // null if security restricted
+ throw new IOException("Failed to list contents of " + directory);
+ }
+
+ IOException exception = null;
+ for (File file : files) {
+ try {
+ delete(file);
+ } catch (IOException ioe) {
+ exception = ioe;
+ }
+ }
+
+ if (null != exception) {
+ throw exception;
+ }
+ }
+
+
+ public static void writeStringToFile(File file, String data, String encoding) throws IOException {
+ OutputStream os = null;
+ try {
+ os = new FileOutputStream(file);
+ os.write(data.getBytes(encoding));
+ } finally {
+ if (null != os) {
+ os.close();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/test/java/com/alibaba/rocketmq/common/MixAllTest.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/test/java/com/alibaba/rocketmq/common/MixAllTest.java b/rocketmq-common/src/test/java/com/alibaba/rocketmq/common/MixAllTest.java
new file mode 100644
index 0000000..72e02d0
--- /dev/null
+++ b/rocketmq-common/src/test/java/com/alibaba/rocketmq/common/MixAllTest.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.rocketmq.common;
+
+import junit.framework.Assert;
+import org.junit.Test;
+
+import java.net.InetAddress;
+import java.util.List;
+
+
+/**
+ * @author lansheng.zj
+ */
+public class MixAllTest {
+
+ @Test
+ public void test() throws Exception {
+ List<String> localInetAddress = MixAll.getLocalInetAddress();
+ String local = InetAddress.getLocalHost().getHostAddress();
+ Assert.assertTrue(localInetAddress.contains("127.0.0.1"));
+ Assert.assertTrue(localInetAddress.contains(local));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/test/java/com/alibaba/rocketmq/common/RemotingUtilTest.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/test/java/com/alibaba/rocketmq/common/RemotingUtilTest.java b/rocketmq-common/src/test/java/com/alibaba/rocketmq/common/RemotingUtilTest.java
new file mode 100644
index 0000000..e6468b9
--- /dev/null
+++ b/rocketmq-common/src/test/java/com/alibaba/rocketmq/common/RemotingUtilTest.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.common;
+
+import com.alibaba.rocketmq.remoting.common.RemotingUtil;
+import org.junit.Test;
+
+
+public class RemotingUtilTest {
+ @Test
+ public void test() throws Exception {
+ String a = RemotingUtil.getLocalAddress();
+ System.out.println(a);
+ }
+}