You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2016/12/19 09:40:36 UTC

[19/43] incubator-rocketmq git commit: Finish code dump. Reviewed by: @yukon @vongosling @stevenschew @vintagewang @lollipop @zander

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/heartbeat/SubscriptionData.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/heartbeat/SubscriptionData.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/heartbeat/SubscriptionData.java
new file mode 100644
index 0000000..b4b72fc
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/heartbeat/SubscriptionData.java
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+/**
+ * $Id: SubscriptionData.java 1835 2013-05-16 02:00:50Z shijia.wxr $
+ */
+package com.alibaba.rocketmq.common.protocol.heartbeat;
+
+import com.alibaba.fastjson.annotation.JSONField;
+
+import java.util.HashSet;
+import java.util.Set;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class SubscriptionData implements Comparable<SubscriptionData> {
+    public final static String SUB_ALL = "*";
+    private boolean classFilterMode = false;
+    private String topic;
+    private String subString;
+    private Set<String> tagsSet = new HashSet<String>();
+    private Set<Integer> codeSet = new HashSet<Integer>();
+    private long subVersion = System.currentTimeMillis();
+
+    @JSONField(serialize = false)
+    private String filterClassSource;
+
+
+    public SubscriptionData() {
+
+    }
+
+
+    public SubscriptionData(String topic, String subString) {
+        super();
+        this.topic = topic;
+        this.subString = subString;
+    }
+
+    public String getFilterClassSource() {
+        return filterClassSource;
+    }
+
+    public void setFilterClassSource(String filterClassSource) {
+        this.filterClassSource = filterClassSource;
+    }
+
+    public String getTopic() {
+        return topic;
+    }
+
+
+    public void setTopic(String topic) {
+        this.topic = topic;
+    }
+
+
+    public String getSubString() {
+        return subString;
+    }
+
+
+    public void setSubString(String subString) {
+        this.subString = subString;
+    }
+
+
+    public Set<String> getTagsSet() {
+        return tagsSet;
+    }
+
+
+    public void setTagsSet(Set<String> tagsSet) {
+        this.tagsSet = tagsSet;
+    }
+
+
+    public long getSubVersion() {
+        return subVersion;
+    }
+
+
+    public void setSubVersion(long subVersion) {
+        this.subVersion = subVersion;
+    }
+
+
+    public Set<Integer> getCodeSet() {
+        return codeSet;
+    }
+
+
+    public void setCodeSet(Set<Integer> codeSet) {
+        this.codeSet = codeSet;
+    }
+
+
+    public boolean isClassFilterMode() {
+        return classFilterMode;
+    }
+
+
+    public void setClassFilterMode(boolean classFilterMode) {
+        this.classFilterMode = classFilterMode;
+    }
+
+
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + (classFilterMode ? 1231 : 1237);
+        result = prime * result + ((codeSet == null) ? 0 : codeSet.hashCode());
+        result = prime * result + ((subString == null) ? 0 : subString.hashCode());
+        result = prime * result + ((tagsSet == null) ? 0 : tagsSet.hashCode());
+        result = prime * result + ((topic == null) ? 0 : topic.hashCode());
+        return result;
+    }
+
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj)
+            return true;
+        if (obj == null)
+            return false;
+        if (getClass() != obj.getClass())
+            return false;
+        SubscriptionData other = (SubscriptionData) obj;
+        if (classFilterMode != other.classFilterMode)
+            return false;
+        if (codeSet == null) {
+            if (other.codeSet != null)
+                return false;
+        } else if (!codeSet.equals(other.codeSet))
+            return false;
+        if (subString == null) {
+            if (other.subString != null)
+                return false;
+        } else if (!subString.equals(other.subString))
+            return false;
+        if (subVersion != other.subVersion)
+            return false;
+        if (tagsSet == null) {
+            if (other.tagsSet != null)
+                return false;
+        } else if (!tagsSet.equals(other.tagsSet))
+            return false;
+        if (topic == null) {
+            if (other.topic != null)
+                return false;
+        } else if (!topic.equals(other.topic))
+            return false;
+        return true;
+    }
+
+
+    @Override
+    public String toString() {
+        return "SubscriptionData [classFilterMode=" + classFilterMode + ", topic=" + topic + ", subString="
+                + subString + ", tagsSet=" + tagsSet + ", codeSet=" + codeSet + ", subVersion=" + subVersion
+                + "]";
+    }
+
+
+    @Override
+    public int compareTo(SubscriptionData other) {
+        String thisValue = this.topic + "@" + this.subString;
+        String otherValue = other.topic + "@" + other.subString;
+        return thisValue.compareTo(otherValue);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/route/BrokerData.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/route/BrokerData.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/route/BrokerData.java
new file mode 100644
index 0000000..322953a
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/route/BrokerData.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+/**
+ * $Id: BrokerData.java 1835 2013-05-16 02:00:50Z shijia.wxr $
+ */
+package com.alibaba.rocketmq.common.protocol.route;
+
+import com.alibaba.rocketmq.common.MixAll;
+
+import java.util.HashMap;
+import java.util.Map;
+
+
+/**
+ * @author shijia.wxr
+ *
+ */
+public class BrokerData implements Comparable<BrokerData> {
+    private String cluster;
+    private String brokerName;
+    private HashMap<Long/* brokerId */, String/* broker address */> brokerAddrs;
+
+    public String selectBrokerAddr() {
+        String value = this.brokerAddrs.get(MixAll.MASTER_ID);
+        if (null == value) {
+            for (Map.Entry<Long, String> entry : this.brokerAddrs.entrySet()) {
+                return entry.getValue();
+            }
+        }
+
+        return value;
+    }
+
+    public HashMap<Long, String> getBrokerAddrs() {
+        return brokerAddrs;
+    }
+
+    public void setBrokerAddrs(HashMap<Long, String> brokerAddrs) {
+        this.brokerAddrs = brokerAddrs;
+    }
+
+    public String getCluster() {
+        return cluster;
+    }
+
+    public void setCluster(String cluster) {
+        this.cluster = cluster;
+    }
+
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + ((brokerAddrs == null) ? 0 : brokerAddrs.hashCode());
+        result = prime * result + ((brokerName == null) ? 0 : brokerName.hashCode());
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj)
+            return true;
+        if (obj == null)
+            return false;
+        if (getClass() != obj.getClass())
+            return false;
+        BrokerData other = (BrokerData) obj;
+        if (brokerAddrs == null) {
+            if (other.brokerAddrs != null)
+                return false;
+        } else if (!brokerAddrs.equals(other.brokerAddrs))
+            return false;
+        if (brokerName == null) {
+            if (other.brokerName != null)
+                return false;
+        } else if (!brokerName.equals(other.brokerName))
+            return false;
+        return true;
+    }
+
+    @Override
+    public String toString() {
+        return "BrokerData [brokerName=" + brokerName + ", brokerAddrs=" + brokerAddrs + "]";
+    }
+
+    @Override
+    public int compareTo(BrokerData o) {
+        return this.brokerName.compareTo(o.getBrokerName());
+    }
+
+    public String getBrokerName() {
+        return brokerName;
+    }
+
+    public void setBrokerName(String brokerName) {
+        this.brokerName = brokerName;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/route/QueueData.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/route/QueueData.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/route/QueueData.java
new file mode 100644
index 0000000..6f62340
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/route/QueueData.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+/**
+ * $Id: QueueData.java 1835 2013-05-16 02:00:50Z shijia.wxr $
+ */
+package com.alibaba.rocketmq.common.protocol.route;
+
+public class QueueData implements Comparable<QueueData> {
+    private String brokerName;
+    private int readQueueNums;
+    private int writeQueueNums;
+    private int perm;
+    private int topicSynFlag;
+
+    public int getReadQueueNums() {
+        return readQueueNums;
+    }
+
+    public void setReadQueueNums(int readQueueNums) {
+        this.readQueueNums = readQueueNums;
+    }
+
+    public int getWriteQueueNums() {
+        return writeQueueNums;
+    }
+
+    public void setWriteQueueNums(int writeQueueNums) {
+        this.writeQueueNums = writeQueueNums;
+    }
+
+    public int getPerm() {
+        return perm;
+    }
+
+    public void setPerm(int perm) {
+        this.perm = perm;
+    }
+
+    public int getTopicSynFlag() {
+        return topicSynFlag;
+    }
+
+    public void setTopicSynFlag(int topicSynFlag) {
+        this.topicSynFlag = topicSynFlag;
+    }
+
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + ((brokerName == null) ? 0 : brokerName.hashCode());
+        result = prime * result + perm;
+        result = prime * result + readQueueNums;
+        result = prime * result + writeQueueNums;
+        result = prime * result + topicSynFlag;
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj)
+            return true;
+        if (obj == null)
+            return false;
+        if (getClass() != obj.getClass())
+            return false;
+        QueueData other = (QueueData) obj;
+        if (brokerName == null) {
+            if (other.brokerName != null)
+                return false;
+        } else if (!brokerName.equals(other.brokerName))
+            return false;
+        if (perm != other.perm)
+            return false;
+        if (readQueueNums != other.readQueueNums)
+            return false;
+        if (writeQueueNums != other.writeQueueNums)
+            return false;
+        if (topicSynFlag != other.topicSynFlag)
+            return false;
+        return true;
+    }
+
+    @Override
+    public String toString() {
+        return "QueueData [brokerName=" + brokerName + ", readQueueNums=" + readQueueNums
+                + ", writeQueueNums=" + writeQueueNums + ", perm=" + perm + ", topicSynFlag=" + topicSynFlag
+                + "]";
+    }
+
+    @Override
+    public int compareTo(QueueData o) {
+        return this.brokerName.compareTo(o.getBrokerName());
+    }
+
+    public String getBrokerName() {
+        return brokerName;
+    }
+
+    public void setBrokerName(String brokerName) {
+        this.brokerName = brokerName;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/route/TopicRouteData.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/route/TopicRouteData.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/route/TopicRouteData.java
new file mode 100644
index 0000000..72e1b96
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/route/TopicRouteData.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+/**
+ * $Id: TopicRouteData.java 1835 2013-05-16 02:00:50Z shijia.wxr $
+ */
+package com.alibaba.rocketmq.common.protocol.route;
+
+import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class TopicRouteData extends RemotingSerializable {
+    private String orderTopicConf;
+    private List<QueueData> queueDatas;
+    private List<BrokerData> brokerDatas;
+    private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
+
+
+    public TopicRouteData cloneTopicRouteData() {
+        TopicRouteData topicRouteData = new TopicRouteData();
+        topicRouteData.setQueueDatas(new ArrayList<QueueData>());
+        topicRouteData.setBrokerDatas(new ArrayList<BrokerData>());
+        topicRouteData.setFilterServerTable(new HashMap<String, List<String>>());
+        topicRouteData.setOrderTopicConf(this.orderTopicConf);
+
+        if (this.queueDatas != null) {
+            topicRouteData.getQueueDatas().addAll(this.queueDatas);
+        }
+
+        if (this.brokerDatas != null) {
+            topicRouteData.getBrokerDatas().addAll(this.brokerDatas);
+        }
+
+        if (this.filterServerTable != null) {
+            topicRouteData.getFilterServerTable().putAll(this.filterServerTable);
+        }
+
+        return topicRouteData;
+    }
+
+
+    public List<QueueData> getQueueDatas() {
+        return queueDatas;
+    }
+
+
+    public void setQueueDatas(List<QueueData> queueDatas) {
+        this.queueDatas = queueDatas;
+    }
+
+
+    public List<BrokerData> getBrokerDatas() {
+        return brokerDatas;
+    }
+
+
+    public void setBrokerDatas(List<BrokerData> brokerDatas) {
+        this.brokerDatas = brokerDatas;
+    }
+
+    public HashMap<String, List<String>> getFilterServerTable() {
+        return filterServerTable;
+    }
+
+    public void setFilterServerTable(HashMap<String, List<String>> filterServerTable) {
+        this.filterServerTable = filterServerTable;
+    }
+
+    public String getOrderTopicConf() {
+        return orderTopicConf;
+    }
+
+    public void setOrderTopicConf(String orderTopicConf) {
+        this.orderTopicConf = orderTopicConf;
+    }
+
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + ((brokerDatas == null) ? 0 : brokerDatas.hashCode());
+        result = prime * result + ((orderTopicConf == null) ? 0 : orderTopicConf.hashCode());
+        result = prime * result + ((queueDatas == null) ? 0 : queueDatas.hashCode());
+        result = prime * result + ((filterServerTable == null) ? 0 : filterServerTable.hashCode());
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj)
+            return true;
+        if (obj == null)
+            return false;
+        if (getClass() != obj.getClass())
+            return false;
+        TopicRouteData other = (TopicRouteData) obj;
+        if (brokerDatas == null) {
+            if (other.brokerDatas != null)
+                return false;
+        } else if (!brokerDatas.equals(other.brokerDatas))
+            return false;
+        if (orderTopicConf == null) {
+            if (other.orderTopicConf != null)
+                return false;
+        } else if (!orderTopicConf.equals(other.orderTopicConf))
+            return false;
+        if (queueDatas == null) {
+            if (other.queueDatas != null)
+                return false;
+        } else if (!queueDatas.equals(other.queueDatas))
+            return false;
+        if (filterServerTable == null) {
+            if (other.filterServerTable != null)
+                return false;
+        } else if (!filterServerTable.equals(other.filterServerTable))
+            return false;
+        return true;
+    }
+
+    @Override
+    public String toString() {
+        return "TopicRouteData [orderTopicConf=" + orderTopicConf + ", queueDatas=" + queueDatas
+                + ", brokerDatas=" + brokerDatas + ", filterServerTable=" + filterServerTable + "]";
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/topic/OffsetMovedEvent.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/topic/OffsetMovedEvent.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/topic/OffsetMovedEvent.java
new file mode 100644
index 0000000..86bdd3d
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/protocol/topic/OffsetMovedEvent.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package com.alibaba.rocketmq.common.protocol.topic;
+
+import com.alibaba.rocketmq.common.message.MessageQueue;
+import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable;
+
+
+public class OffsetMovedEvent extends RemotingSerializable {
+    private String consumerGroup;
+    private MessageQueue messageQueue;
+    private long offsetRequest;
+    private long offsetNew;
+
+
+    public String getConsumerGroup() {
+        return consumerGroup;
+    }
+
+
+    public void setConsumerGroup(String consumerGroup) {
+        this.consumerGroup = consumerGroup;
+    }
+
+
+    public MessageQueue getMessageQueue() {
+        return messageQueue;
+    }
+
+
+    public void setMessageQueue(MessageQueue messageQueue) {
+        this.messageQueue = messageQueue;
+    }
+
+
+    public long getOffsetRequest() {
+        return offsetRequest;
+    }
+
+
+    public void setOffsetRequest(long offsetRequest) {
+        this.offsetRequest = offsetRequest;
+    }
+
+
+    public long getOffsetNew() {
+        return offsetNew;
+    }
+
+
+    public void setOffsetNew(long offsetNew) {
+        this.offsetNew = offsetNew;
+    }
+
+
+    @Override
+    public String toString() {
+        return "OffsetMovedEvent [consumerGroup=" + consumerGroup + ", messageQueue=" + messageQueue
+                + ", offsetRequest=" + offsetRequest + ", offsetNew=" + offsetNew + "]";
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/queue/ConcurrentTreeMap.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/queue/ConcurrentTreeMap.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/queue/ConcurrentTreeMap.java
new file mode 100644
index 0000000..8fc4e76
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/queue/ConcurrentTreeMap.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package com.alibaba.rocketmq.common.queue;
+
+import com.alibaba.rocketmq.common.constant.LoggerName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Comparator;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.locks.ReentrantLock;
+
+
+/**
+ * thread safe
+ *
+ * @author lansheng.zj
+ */
+public class ConcurrentTreeMap<K, V> {
+    private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+    private final ReentrantLock lock;
+    private TreeMap<K, V> tree;
+    private RoundQueue<K> roundQueue;
+
+
+    public ConcurrentTreeMap(int capacity, Comparator<? super K> comparator) {
+        tree = new TreeMap<K, V>(comparator);
+        roundQueue = new RoundQueue<K>(capacity);
+        lock = new ReentrantLock(true);
+    }
+
+
+    public Map.Entry<K, V> pollFirstEntry() {
+        lock.lock();
+        try {
+            return tree.pollFirstEntry();
+        } finally {
+            lock.unlock();
+        }
+    }
+
+
+    public V putIfAbsentAndRetExsit(K key, V value) {
+        lock.lock();
+        try {
+            if (roundQueue.put(key)) {
+                V exsit = tree.get(key);
+                if (null == exsit) {
+                    tree.put(key, value);
+                    exsit = value;
+                }
+                log.warn("putIfAbsentAndRetExsit success. {}", key);
+                return exsit;
+            }
+
+            else {
+                V exsit = tree.get(key);
+                return exsit;
+            }
+        } finally {
+            lock.unlock();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/queue/RoundQueue.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/queue/RoundQueue.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/queue/RoundQueue.java
new file mode 100644
index 0000000..a3783ba
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/queue/RoundQueue.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package com.alibaba.rocketmq.common.queue;
+
+import java.util.LinkedList;
+import java.util.Queue;
+
+
+/**
+ * not thread safe
+ *
+ * @author lansheng.zj
+ */
+public class RoundQueue<E> {
+
+    private Queue<E> queue;
+    private int capacity;
+
+
+    public RoundQueue(int capacity) {
+        this.capacity = capacity;
+        queue = new LinkedList<E>();
+    }
+
+
+    public boolean put(E e) {
+        boolean ok = false;
+        if (!queue.contains(e)) {
+            if (queue.size() >= capacity) {
+                queue.poll();
+            }
+            queue.add(e);
+            ok = true;
+        }
+
+        return ok;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/running/RunningStats.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/running/RunningStats.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/running/RunningStats.java
new file mode 100644
index 0000000..aa0bc54
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/running/RunningStats.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.common.running;
+
+public enum RunningStats {
+    commitLogMaxOffset,
+    commitLogMinOffset,
+    commitLogDiskRatio,
+    consumeQueueDiskRatio,
+    scheduleMessageOffset,
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/stats/MomentStatsItem.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/stats/MomentStatsItem.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/stats/MomentStatsItem.java
new file mode 100644
index 0000000..89eefa5
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/stats/MomentStatsItem.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.rocketmq.common.stats;
+
+import com.alibaba.rocketmq.common.UtilAll;
+import org.slf4j.Logger;
+
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+
+public class MomentStatsItem {
+
+    private final AtomicLong value = new AtomicLong(0);
+
+    private final String statsName;
+    private final String statsKey;
+    private final ScheduledExecutorService scheduledExecutorService;
+    private final Logger log;
+
+
+    public MomentStatsItem(String statsName, String statsKey,
+                           ScheduledExecutorService scheduledExecutorService, Logger log) {
+        this.statsName = statsName;
+        this.statsKey = statsKey;
+        this.scheduledExecutorService = scheduledExecutorService;
+        this.log = log;
+    }
+
+
+    public void init() {
+        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    printAtMinutes();
+
+                    MomentStatsItem.this.value.set(0);
+                } catch (Throwable e) {
+                }
+            }
+        }, Math.abs(UtilAll.computNextMinutesTimeMillis() - System.currentTimeMillis()), 1000 * 60 * 5, TimeUnit.MILLISECONDS);
+    }
+
+
+    public void printAtMinutes() {
+        log.info(String.format("[%s] [%s] Stats Every 5 Minutes, Value: %d",
+                this.statsName,
+                this.statsKey,
+                this.value.get()));
+    }
+
+    public AtomicLong getValue() {
+        return value;
+    }
+
+
+    public String getStatsKey() {
+        return statsKey;
+    }
+
+
+    public String getStatsName() {
+        return statsName;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/stats/MomentStatsItemSet.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/stats/MomentStatsItemSet.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/stats/MomentStatsItemSet.java
new file mode 100644
index 0000000..fde88cd
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/stats/MomentStatsItemSet.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.rocketmq.common.stats;
+
+import com.alibaba.rocketmq.common.UtilAll;
+import org.slf4j.Logger;
+
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+
+public class MomentStatsItemSet {
+    private final ConcurrentHashMap<String/* key */, MomentStatsItem> statsItemTable =
+            new ConcurrentHashMap<String, MomentStatsItem>(128);
+    private final String statsName;
+    private final ScheduledExecutorService scheduledExecutorService;
+    private final Logger log;
+
+
+    public MomentStatsItemSet(String statsName, ScheduledExecutorService scheduledExecutorService, Logger log) {
+        this.statsName = statsName;
+        this.scheduledExecutorService = scheduledExecutorService;
+        this.log = log;
+        this.init();
+    }
+
+    public ConcurrentHashMap<String, MomentStatsItem> getStatsItemTable() {
+        return statsItemTable;
+    }
+
+    public String getStatsName() {
+        return statsName;
+    }
+
+    public void init() {
+
+        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    printAtMinutes();
+                } catch (Throwable e) {
+                }
+            }
+        }, Math.abs(UtilAll.computNextMinutesTimeMillis() - System.currentTimeMillis()), 1000 * 60 * 5, TimeUnit.MILLISECONDS);
+    }
+
+    private void printAtMinutes() {
+        Iterator<Entry<String, MomentStatsItem>> it = this.statsItemTable.entrySet().iterator();
+        while (it.hasNext()) {
+            Entry<String, MomentStatsItem> next = it.next();
+            next.getValue().printAtMinutes();
+        }
+    }
+
+    public void setValue(final String statsKey, final int value) {
+        MomentStatsItem statsItem = this.getAndCreateStatsItem(statsKey);
+        statsItem.getValue().set(value);
+    }
+
+    public MomentStatsItem getAndCreateStatsItem(final String statsKey) {
+        MomentStatsItem statsItem = this.statsItemTable.get(statsKey);
+        if (null == statsItem) {
+            statsItem =
+                    new MomentStatsItem(this.statsName, statsKey, this.scheduledExecutorService, this.log);
+            MomentStatsItem prev = this.statsItemTable.put(statsKey, statsItem);
+
+            if (null == prev) {
+
+                // statsItem.init();
+            }
+        }
+
+        return statsItem;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/stats/StatsItem.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/stats/StatsItem.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/stats/StatsItem.java
new file mode 100644
index 0000000..1c99699
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/stats/StatsItem.java
@@ -0,0 +1,272 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.rocketmq.common.stats;
+
+import com.alibaba.rocketmq.common.UtilAll;
+import org.slf4j.Logger;
+
+import java.util.LinkedList;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+
+public class StatsItem {
+
+    private final AtomicLong value = new AtomicLong(0);
+
+    private final AtomicLong times = new AtomicLong(0);
+
+    private final LinkedList<CallSnapshot> csListMinute = new LinkedList<CallSnapshot>();
+
+
+    private final LinkedList<CallSnapshot> csListHour = new LinkedList<CallSnapshot>();
+
+
+    private final LinkedList<CallSnapshot> csListDay = new LinkedList<CallSnapshot>();
+
+    private final String statsName;
+    private final String statsKey;
+    private final ScheduledExecutorService scheduledExecutorService;
+    private final Logger log;
+
+
+    public StatsItem(String statsName, String statsKey, ScheduledExecutorService scheduledExecutorService,
+                     Logger log) {
+        this.statsName = statsName;
+        this.statsKey = statsKey;
+        this.scheduledExecutorService = scheduledExecutorService;
+        this.log = log;
+    }
+
+    public StatsSnapshot getStatsDataInMinute() {
+        return computeStatsData(this.csListMinute);
+    }
+
+    private static StatsSnapshot computeStatsData(final LinkedList<CallSnapshot> csList) {
+        StatsSnapshot statsSnapshot = new StatsSnapshot();
+        synchronized (csList) {
+            double tps = 0;
+            double avgpt = 0;
+            long sum = 0;
+            if (!csList.isEmpty()) {
+                CallSnapshot first = csList.getFirst();
+                CallSnapshot last = csList.getLast();
+                sum = last.getValue() - first.getValue();
+                tps = (sum * 1000.0d) / (last.getTimestamp() - first.getTimestamp());
+
+                long timesDiff = last.getTimes() - first.getTimes();
+                if (timesDiff > 0) {
+                    avgpt = (sum * 1.0d) / timesDiff;
+                }
+            }
+
+            statsSnapshot.setSum(sum);
+            statsSnapshot.setTps(tps);
+            statsSnapshot.setAvgpt(avgpt);
+        }
+
+        return statsSnapshot;
+    }
+
+    public StatsSnapshot getStatsDataInHour() {
+        return computeStatsData(this.csListHour);
+    }
+
+    public StatsSnapshot getStatsDataInDay() {
+        return computeStatsData(this.csListDay);
+    }
+
+    public void init() {
+
+        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    samplingInSeconds();
+                } catch (Throwable e) {
+                }
+            }
+        }, 0, 10, TimeUnit.SECONDS);
+
+
+        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    samplingInMinutes();
+                } catch (Throwable e) {
+                }
+            }
+        }, 0, 10, TimeUnit.MINUTES);
+
+
+        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    samplingInHour();
+                } catch (Throwable e) {
+                }
+            }
+        }, 0, 1, TimeUnit.HOURS);
+
+
+        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    printAtMinutes();
+                } catch (Throwable ignored) {
+                }
+            }
+        }, Math.abs(UtilAll.computNextMinutesTimeMillis() - System.currentTimeMillis()), 1000 * 60, TimeUnit.MILLISECONDS);
+
+
+        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    printAtHour();
+                } catch (Throwable ignored) {
+                }
+            }
+        }, Math.abs(UtilAll.computNextHourTimeMillis() - System.currentTimeMillis()), 1000 * 60 * 60, TimeUnit.MILLISECONDS);
+
+
+        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    printAtDay();
+                } catch (Throwable ignored) {
+                }
+            }
+        }, Math.abs(UtilAll.computNextMorningTimeMillis() - System.currentTimeMillis()) - 2000, 1000 * 60 * 60 * 24, TimeUnit.MILLISECONDS);
+    }
+
+    public void samplingInSeconds() {
+        synchronized (this.csListMinute) {
+            this.csListMinute.add(new CallSnapshot(System.currentTimeMillis(), this.times.get(), this.value
+                    .get()));
+            if (this.csListMinute.size() > 7) {
+                this.csListMinute.removeFirst();
+            }
+        }
+    }
+
+    public void samplingInMinutes() {
+        synchronized (this.csListHour) {
+            this.csListHour.add(new CallSnapshot(System.currentTimeMillis(), this.times.get(), this.value
+                    .get()));
+            if (this.csListHour.size() > 7) {
+                this.csListHour.removeFirst();
+            }
+        }
+    }
+
+    public void samplingInHour() {
+        synchronized (this.csListDay) {
+            this.csListDay.add(new CallSnapshot(System.currentTimeMillis(), this.times.get(), this.value
+                    .get()));
+            if (this.csListDay.size() > 25) {
+                this.csListDay.removeFirst();
+            }
+        }
+    }
+
+    public void printAtMinutes() {
+        StatsSnapshot ss = computeStatsData(this.csListMinute);
+        log.info(String.format("[%s] [%s] Stats In One Minute, SUM: %d TPS: %.2f AVGPT: %.2f",
+                this.statsName,
+                this.statsKey,
+                ss.getSum(),
+                ss.getTps(),
+                ss.getAvgpt()));
+    }
+
+    public void printAtHour() {
+        StatsSnapshot ss = computeStatsData(this.csListHour);
+        log.info(String.format("[%s] [%s] Stats In One Hour, SUM: %d TPS: %.2f AVGPT: %.2f",
+                this.statsName,
+                this.statsKey,
+                ss.getSum(),
+                ss.getTps(),
+                ss.getAvgpt()));
+    }
+
+    public void printAtDay() {
+        StatsSnapshot ss = computeStatsData(this.csListDay);
+        log.info(String.format("[%s] [%s] Stats In One Day, SUM: %d TPS: %.2f AVGPT: %.2f",
+                this.statsName,
+                this.statsKey,
+                ss.getSum(),
+                ss.getTps(),
+                ss.getAvgpt()));
+    }
+
+    public AtomicLong getValue() {
+        return value;
+    }
+
+
+    public String getStatsKey() {
+        return statsKey;
+    }
+
+
+    public String getStatsName() {
+        return statsName;
+    }
+
+
+    public AtomicLong getTimes() {
+        return times;
+    }
+}
+
+
+class CallSnapshot {
+    private final long timestamp;
+    private final long times;
+
+    private final long value;
+
+
+    public CallSnapshot(long timestamp, long times, long value) {
+        super();
+        this.timestamp = timestamp;
+        this.times = times;
+        this.value = value;
+    }
+
+
+    public long getTimestamp() {
+        return timestamp;
+    }
+
+
+    public long getTimes() {
+        return times;
+    }
+
+
+    public long getValue() {
+        return value;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/stats/StatsItemSet.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/stats/StatsItemSet.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/stats/StatsItemSet.java
new file mode 100644
index 0000000..8a2b2a1
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/stats/StatsItemSet.java
@@ -0,0 +1,210 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.rocketmq.common.stats;
+
+import com.alibaba.rocketmq.common.UtilAll;
+import org.slf4j.Logger;
+
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+
+public class StatsItemSet {
+    private final ConcurrentHashMap<String/* key */, StatsItem> statsItemTable =
+            new ConcurrentHashMap<String, StatsItem>(128);
+
+    private final String statsName;
+    private final ScheduledExecutorService scheduledExecutorService;
+    private final Logger log;
+
+
+    public StatsItemSet(String statsName, ScheduledExecutorService scheduledExecutorService, Logger log) {
+        this.statsName = statsName;
+        this.scheduledExecutorService = scheduledExecutorService;
+        this.log = log;
+        this.init();
+    }
+
+    public void init() {
+
+        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    samplingInSeconds();
+                } catch (Throwable e) {
+                }
+            }
+        }, 0, 10, TimeUnit.SECONDS);
+
+
+        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    samplingInMinutes();
+                } catch (Throwable e) {
+                }
+            }
+        }, 0, 10, TimeUnit.MINUTES);
+
+
+        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    samplingInHour();
+                } catch (Throwable e) {
+                }
+            }
+        }, 0, 1, TimeUnit.HOURS);
+
+
+        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    printAtMinutes();
+                } catch (Throwable e) {
+                }
+            }
+        }, Math.abs(UtilAll.computNextMinutesTimeMillis() - System.currentTimeMillis()), 1000 * 60, TimeUnit.MILLISECONDS);
+
+
+        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    printAtHour();
+                } catch (Throwable e) {
+                }
+            }
+        }, Math.abs(UtilAll.computNextHourTimeMillis() - System.currentTimeMillis()), 1000 * 60 * 60, TimeUnit.MILLISECONDS);
+
+
+        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    printAtDay();
+                } catch (Throwable e) {
+                }
+            }
+        }, Math.abs(UtilAll.computNextMorningTimeMillis() - System.currentTimeMillis()), 1000 * 60 * 60 * 24, TimeUnit.MILLISECONDS);
+    }
+
+    private void samplingInSeconds() {
+        Iterator<Entry<String, StatsItem>> it = this.statsItemTable.entrySet().iterator();
+        while (it.hasNext()) {
+            Entry<String, StatsItem> next = it.next();
+            next.getValue().samplingInSeconds();
+        }
+    }
+
+    private void samplingInMinutes() {
+        Iterator<Entry<String, StatsItem>> it = this.statsItemTable.entrySet().iterator();
+        while (it.hasNext()) {
+            Entry<String, StatsItem> next = it.next();
+            next.getValue().samplingInMinutes();
+        }
+    }
+
+    private void samplingInHour() {
+        Iterator<Entry<String, StatsItem>> it = this.statsItemTable.entrySet().iterator();
+        while (it.hasNext()) {
+            Entry<String, StatsItem> next = it.next();
+            next.getValue().samplingInHour();
+        }
+    }
+
+    private void printAtMinutes() {
+        Iterator<Entry<String, StatsItem>> it = this.statsItemTable.entrySet().iterator();
+        while (it.hasNext()) {
+            Entry<String, StatsItem> next = it.next();
+            next.getValue().printAtMinutes();
+        }
+    }
+
+    private void printAtHour() {
+        Iterator<Entry<String, StatsItem>> it = this.statsItemTable.entrySet().iterator();
+        while (it.hasNext()) {
+            Entry<String, StatsItem> next = it.next();
+            next.getValue().printAtHour();
+        }
+    }
+
+    private void printAtDay() {
+        Iterator<Entry<String, StatsItem>> it = this.statsItemTable.entrySet().iterator();
+        while (it.hasNext()) {
+            Entry<String, StatsItem> next = it.next();
+            next.getValue().printAtDay();
+        }
+    }
+
+    public void addValue(final String statsKey, final int incValue, final int incTimes) {
+        StatsItem statsItem = this.getAndCreateStatsItem(statsKey);
+        statsItem.getValue().addAndGet(incValue);
+        statsItem.getTimes().addAndGet(incTimes);
+    }
+
+    public StatsItem getAndCreateStatsItem(final String statsKey) {
+        StatsItem statsItem = this.statsItemTable.get(statsKey);
+        if (null == statsItem) {
+            statsItem = new StatsItem(this.statsName, statsKey, this.scheduledExecutorService, this.log);
+            StatsItem prev = this.statsItemTable.put(statsKey, statsItem);
+
+            if (null == prev) {
+
+                // statsItem.init();
+            }
+        }
+
+        return statsItem;
+    }
+
+    public StatsSnapshot getStatsDataInMinute(final String statsKey) {
+        StatsItem statsItem = this.statsItemTable.get(statsKey);
+        if (null != statsItem) {
+            return statsItem.getStatsDataInMinute();
+        }
+        return new StatsSnapshot();
+    }
+
+    public StatsSnapshot getStatsDataInHour(final String statsKey) {
+        StatsItem statsItem = this.statsItemTable.get(statsKey);
+        if (null != statsItem) {
+            return statsItem.getStatsDataInHour();
+        }
+        return new StatsSnapshot();
+    }
+
+    public StatsSnapshot getStatsDataInDay(final String statsKey) {
+        StatsItem statsItem = this.statsItemTable.get(statsKey);
+        if (null != statsItem) {
+            return statsItem.getStatsDataInDay();
+        }
+        return new StatsSnapshot();
+    }
+
+    public StatsItem getStatsItem(final String statsKey) {
+        return this.statsItemTable.get(statsKey);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/stats/StatsSnapshot.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/stats/StatsSnapshot.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/stats/StatsSnapshot.java
new file mode 100644
index 0000000..4092a2b
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/stats/StatsSnapshot.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package com.alibaba.rocketmq.common.stats;
+
+public class StatsSnapshot {
+    private long sum;
+    private double tps;
+    private double avgpt;
+
+
+    public long getSum() {
+        return sum;
+    }
+
+
+    public void setSum(long sum) {
+        this.sum = sum;
+    }
+
+
+    public double getTps() {
+        return tps;
+    }
+
+
+    public void setTps(double tps) {
+        this.tps = tps;
+    }
+
+
+    public double getAvgpt() {
+        return avgpt;
+    }
+
+
+    public void setAvgpt(double avgpt) {
+        this.avgpt = avgpt;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/subscription/SubscriptionGroupConfig.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/subscription/SubscriptionGroupConfig.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/subscription/SubscriptionGroupConfig.java
new file mode 100644
index 0000000..cf8baf2
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/subscription/SubscriptionGroupConfig.java
@@ -0,0 +1,194 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.rocketmq.common.subscription;
+
+import com.alibaba.rocketmq.common.MixAll;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class SubscriptionGroupConfig {
+
+    private String groupName;
+
+    private boolean consumeEnable = true;
+    private boolean consumeFromMinEnable = true;
+
+    private boolean consumeBroadcastEnable = true;
+
+    private int retryQueueNums = 1;
+
+    private int retryMaxTimes = 16;
+
+    private long brokerId = MixAll.MASTER_ID;
+
+    private long whichBrokerWhenConsumeSlowly = 1;
+
+    private boolean notifyConsumerIdsChangedEnable = true;
+
+
+    public String getGroupName() {
+        return groupName;
+    }
+
+
+    public void setGroupName(String groupName) {
+        this.groupName = groupName;
+    }
+
+
+    public boolean isConsumeEnable() {
+        return consumeEnable;
+    }
+
+
+    public void setConsumeEnable(boolean consumeEnable) {
+        this.consumeEnable = consumeEnable;
+    }
+
+
+    public boolean isConsumeFromMinEnable() {
+        return consumeFromMinEnable;
+    }
+
+
+    public void setConsumeFromMinEnable(boolean consumeFromMinEnable) {
+        this.consumeFromMinEnable = consumeFromMinEnable;
+    }
+
+
+    public boolean isConsumeBroadcastEnable() {
+        return consumeBroadcastEnable;
+    }
+
+
+    public void setConsumeBroadcastEnable(boolean consumeBroadcastEnable) {
+        this.consumeBroadcastEnable = consumeBroadcastEnable;
+    }
+
+
+    public int getRetryQueueNums() {
+        return retryQueueNums;
+    }
+
+
+    public void setRetryQueueNums(int retryQueueNums) {
+        this.retryQueueNums = retryQueueNums;
+    }
+
+
+    public int getRetryMaxTimes() {
+        return retryMaxTimes;
+    }
+
+
+    public void setRetryMaxTimes(int retryMaxTimes) {
+        this.retryMaxTimes = retryMaxTimes;
+    }
+
+
+    public long getBrokerId() {
+        return brokerId;
+    }
+
+
+    public void setBrokerId(long brokerId) {
+        this.brokerId = brokerId;
+    }
+
+
+    public long getWhichBrokerWhenConsumeSlowly() {
+        return whichBrokerWhenConsumeSlowly;
+    }
+
+
+    public void setWhichBrokerWhenConsumeSlowly(long whichBrokerWhenConsumeSlowly) {
+        this.whichBrokerWhenConsumeSlowly = whichBrokerWhenConsumeSlowly;
+    }
+
+    public boolean isNotifyConsumerIdsChangedEnable() {
+        return notifyConsumerIdsChangedEnable;
+    }
+
+    public void setNotifyConsumerIdsChangedEnable(final boolean notifyConsumerIdsChangedEnable) {
+        this.notifyConsumerIdsChangedEnable = notifyConsumerIdsChangedEnable;
+    }
+
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + (int) (brokerId ^ (brokerId >>> 32));
+        result = prime * result + (consumeBroadcastEnable ? 1231 : 1237);
+        result = prime * result + (consumeEnable ? 1231 : 1237);
+        result = prime * result + (consumeFromMinEnable ? 1231 : 1237);
+        result = prime * result + (notifyConsumerIdsChangedEnable ? 1231 : 1237);
+        result = prime * result + ((groupName == null) ? 0 : groupName.hashCode());
+        result = prime * result + retryMaxTimes;
+        result = prime * result + retryQueueNums;
+        result =
+                prime * result + (int) (whichBrokerWhenConsumeSlowly ^ (whichBrokerWhenConsumeSlowly >>> 32));
+        return result;
+    }
+
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj)
+            return true;
+        if (obj == null)
+            return false;
+        if (getClass() != obj.getClass())
+            return false;
+        SubscriptionGroupConfig other = (SubscriptionGroupConfig) obj;
+        if (brokerId != other.brokerId)
+            return false;
+        if (consumeBroadcastEnable != other.consumeBroadcastEnable)
+            return false;
+        if (consumeEnable != other.consumeEnable)
+            return false;
+        if (consumeFromMinEnable != other.consumeFromMinEnable)
+            return false;
+        if (groupName == null) {
+            if (other.groupName != null)
+                return false;
+        } else if (!groupName.equals(other.groupName))
+            return false;
+        if (retryMaxTimes != other.retryMaxTimes)
+            return false;
+        if (retryQueueNums != other.retryQueueNums)
+            return false;
+        if (whichBrokerWhenConsumeSlowly != other.whichBrokerWhenConsumeSlowly)
+            return false;
+        if (notifyConsumerIdsChangedEnable != other.notifyConsumerIdsChangedEnable)
+            return false;
+        return true;
+    }
+
+
+    @Override
+    public String toString() {
+        return "SubscriptionGroupConfig [groupName=" + groupName + ", consumeEnable=" + consumeEnable
+                + ", consumeFromMinEnable=" + consumeFromMinEnable + ", consumeBroadcastEnable="
+                + consumeBroadcastEnable + ", retryQueueNums=" + retryQueueNums + ", retryMaxTimes="
+                + retryMaxTimes + ", brokerId=" + brokerId + ", whichBrokerWhenConsumeSlowly="
+                + whichBrokerWhenConsumeSlowly + ", notifyConsumerIdsChangedEnable="
+                + notifyConsumerIdsChangedEnable + "]";
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/sysflag/MessageSysFlag.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/sysflag/MessageSysFlag.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/sysflag/MessageSysFlag.java
new file mode 100644
index 0000000..2f9d057
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/sysflag/MessageSysFlag.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.common.sysflag;
+
+/**
+ * @author shijia.wxr
+ */
+public class MessageSysFlag {
+    public final static int COMPRESSED_FLAG = 0x1;
+    public final static int MULTI_TAGS_FLAG = 0x1 << 1;
+    public final static int TRANSACTION_NOT_TYPE = 0;
+    public final static int TRANSACTION_PREPARED_TYPE = 0x1 << 2;
+    public final static int TRANSACTION_COMMIT_TYPE = 0x2 << 2;
+    public final static int TRANSACTION_ROLLBACK_TYPE = 0x3 << 2;
+
+
+    public static int getTransactionValue(final int flag) {
+        return flag & TRANSACTION_ROLLBACK_TYPE;
+    }
+
+
+    public static int resetTransactionValue(final int flag, final int type) {
+        return (flag & (~TRANSACTION_ROLLBACK_TYPE)) | type;
+    }
+
+
+    public static int clearCompressedFlag(final int flag) {
+        return flag & (~COMPRESSED_FLAG);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/sysflag/PullSysFlag.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/sysflag/PullSysFlag.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/sysflag/PullSysFlag.java
new file mode 100644
index 0000000..d0f7287
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/sysflag/PullSysFlag.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.common.sysflag;
+
+/**
+ * @author shijia.wxr
+ */
+public class PullSysFlag {
+    private final static int FLAG_COMMIT_OFFSET = 0x1 << 0;
+    private final static int FLAG_SUSPEND = 0x1 << 1;
+    private final static int FLAG_SUBSCRIPTION = 0x1 << 2;
+    private final static int FLAG_CLASS_FILTER = 0x1 << 3;
+
+
+    public static int buildSysFlag(final boolean commitOffset, final boolean suspend,
+                                   final boolean subscription, final boolean classFilter) {
+        int flag = 0;
+
+        if (commitOffset) {
+            flag |= FLAG_COMMIT_OFFSET;
+        }
+
+        if (suspend) {
+            flag |= FLAG_SUSPEND;
+        }
+
+        if (subscription) {
+            flag |= FLAG_SUBSCRIPTION;
+        }
+
+        if (classFilter) {
+            flag |= FLAG_CLASS_FILTER;
+        }
+
+        return flag;
+    }
+
+
+    public static int clearCommitOffsetFlag(final int sysFlag) {
+        return sysFlag & (~FLAG_COMMIT_OFFSET);
+    }
+
+
+    public static boolean hasCommitOffsetFlag(final int sysFlag) {
+        return (sysFlag & FLAG_COMMIT_OFFSET) == FLAG_COMMIT_OFFSET;
+    }
+
+
+    public static boolean hasSuspendFlag(final int sysFlag) {
+        return (sysFlag & FLAG_SUSPEND) == FLAG_SUSPEND;
+    }
+
+
+    public static boolean hasSubscriptionFlag(final int sysFlag) {
+        return (sysFlag & FLAG_SUBSCRIPTION) == FLAG_SUBSCRIPTION;
+    }
+
+
+    public static boolean hasClassFilterFlag(final int sysFlag) {
+        return (sysFlag & FLAG_CLASS_FILTER) == FLAG_CLASS_FILTER;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/sysflag/SubscriptionSysFlag.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/sysflag/SubscriptionSysFlag.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/sysflag/SubscriptionSysFlag.java
new file mode 100644
index 0000000..65e3115
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/sysflag/SubscriptionSysFlag.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.common.sysflag;
+
+/**
+ * @author manhong.yqd
+ */
+public class SubscriptionSysFlag {
+
+    private final static int FLAG_UNIT = 0x1 << 0;
+
+
+    public static int buildSysFlag(final boolean unit) {
+        int sysFlag = 0;
+
+        if (unit) {
+            sysFlag |= FLAG_UNIT;
+        }
+
+        return sysFlag;
+    }
+
+
+    public static int setUnitFlag(final int sysFlag) {
+        return sysFlag | FLAG_UNIT;
+    }
+
+
+    public static int clearUnitFlag(final int sysFlag) {
+        return sysFlag & (~FLAG_UNIT);
+    }
+
+
+    public static boolean hasUnitFlag(final int sysFlag) {
+        return (sysFlag & FLAG_UNIT) == FLAG_UNIT;
+    }
+
+
+    public static void main(String[] args) {
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/sysflag/TopicSysFlag.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/sysflag/TopicSysFlag.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/sysflag/TopicSysFlag.java
new file mode 100644
index 0000000..90d48f4
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/sysflag/TopicSysFlag.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.common.sysflag;
+
+/**
+
+ *
+ * @author manhong.yqd
+ *
+ */
+public class TopicSysFlag {
+
+    private final static int FLAG_UNIT = 0x1 << 0;
+
+    private final static int FLAG_UNIT_SUB = 0x1 << 1;
+
+
+    public static int buildSysFlag(final boolean unit, final boolean hasUnitSub) {
+        int sysFlag = 0;
+
+        if (unit) {
+            sysFlag |= FLAG_UNIT;
+        }
+
+        if (hasUnitSub) {
+            sysFlag |= FLAG_UNIT_SUB;
+        }
+
+        return sysFlag;
+    }
+
+
+    public static int setUnitFlag(final int sysFlag) {
+        return sysFlag | FLAG_UNIT;
+    }
+
+
+    public static int clearUnitFlag(final int sysFlag) {
+        return sysFlag & (~FLAG_UNIT);
+    }
+
+
+    public static boolean hasUnitFlag(final int sysFlag) {
+        return (sysFlag & FLAG_UNIT) == FLAG_UNIT;
+    }
+
+
+    public static int setUnitSubFlag(final int sysFlag) {
+        return sysFlag | FLAG_UNIT_SUB;
+    }
+
+
+    public static int clearUnitSubFlag(final int sysFlag) {
+        return sysFlag & (~FLAG_UNIT_SUB);
+    }
+
+
+    public static boolean hasUnitSubFlag(final int sysFlag) {
+        return (sysFlag & FLAG_UNIT_SUB) == FLAG_UNIT_SUB;
+    }
+
+
+    public static void main(String[] args) {
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/utils/ChannelUtil.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/utils/ChannelUtil.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/utils/ChannelUtil.java
new file mode 100644
index 0000000..444928f
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/utils/ChannelUtil.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package com.alibaba.rocketmq.common.utils;
+
+import io.netty.channel.Channel;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+
+public class ChannelUtil {
+    public static String getRemoteIp(Channel channel) {
+        InetSocketAddress inetSocketAddress = (InetSocketAddress) channel.remoteAddress();
+        if (inetSocketAddress == null) {
+            return "";
+        }
+        final InetAddress inetAddr = inetSocketAddress.getAddress();
+        return inetAddr != null ? inetAddr.getHostAddress() : inetSocketAddress.getHostName();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/utils/HttpTinyClient.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/utils/HttpTinyClient.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/utils/HttpTinyClient.java
new file mode 100755
index 0000000..dadac46
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/utils/HttpTinyClient.java
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package com.alibaba.rocketmq.common.utils;
+
+import com.alibaba.rocketmq.common.MQVersion;
+import com.alibaba.rocketmq.common.MixAll;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.net.URLEncoder;
+import java.util.Iterator;
+import java.util.List;
+
+
+public class HttpTinyClient {
+
+    static public HttpResult httpGet(String url, List<String> headers, List<String> paramValues,
+                                     String encoding, long readTimeoutMs) throws IOException {
+        String encodedContent = encodingParams(paramValues, encoding);
+        url += (null == encodedContent) ? "" : ("?" + encodedContent);
+
+        HttpURLConnection conn = null;
+        try {
+            conn = (HttpURLConnection) new URL(url).openConnection();
+            conn.setRequestMethod("GET");
+            conn.setConnectTimeout((int) readTimeoutMs);
+            conn.setReadTimeout((int) readTimeoutMs);
+            setHeaders(conn, headers, encoding);
+
+            conn.connect();
+            int respCode = conn.getResponseCode();
+            String resp = null;
+
+            if (HttpURLConnection.HTTP_OK == respCode) {
+                resp = IOTinyUtils.toString(conn.getInputStream(), encoding);
+            } else {
+                resp = IOTinyUtils.toString(conn.getErrorStream(), encoding);
+            }
+            return new HttpResult(respCode, resp);
+        } finally {
+            if (conn != null) {
+                conn.disconnect();
+            }
+        }
+    }
+
+    static private String encodingParams(List<String> paramValues, String encoding)
+            throws UnsupportedEncodingException {
+        StringBuilder sb = new StringBuilder();
+        if (null == paramValues) {
+            return null;
+        }
+
+        for (Iterator<String> iter = paramValues.iterator(); iter.hasNext(); ) {
+            sb.append(iter.next()).append("=");
+            sb.append(URLEncoder.encode(iter.next(), encoding));
+            if (iter.hasNext()) {
+                sb.append("&");
+            }
+        }
+        return sb.toString();
+    }
+
+    static private void setHeaders(HttpURLConnection conn, List<String> headers, String encoding) {
+        if (null != headers) {
+            for (Iterator<String> iter = headers.iterator(); iter.hasNext(); ) {
+                conn.addRequestProperty(iter.next(), iter.next());
+            }
+        }
+        conn.addRequestProperty("Client-Version", MQVersion.getVersionDesc(MQVersion.CURRENT_VERSION));
+        conn.addRequestProperty("Content-Type", "application/x-www-form-urlencoded;charset=" + encoding);
+
+
+        String ts = String.valueOf(System.currentTimeMillis());
+        conn.addRequestProperty("Metaq-Client-RequestTS", ts);
+    }
+
+    /**
+
+     *
+     * @param url
+     * @param headers
+
+     * @param paramValues
+
+     * @param encoding
+
+     * @param readTimeoutMs
+
+     *
+     * @return the http response of given http post request
+     *
+     * @throws java.io.IOException
+     */
+    static public HttpResult httpPost(String url, List<String> headers, List<String> paramValues,
+                                      String encoding, long readTimeoutMs) throws IOException {
+        String encodedContent = encodingParams(paramValues, encoding);
+
+        HttpURLConnection conn = null;
+        try {
+            conn = (HttpURLConnection) new URL(url).openConnection();
+            conn.setRequestMethod("POST");
+            conn.setConnectTimeout(3000);
+            conn.setReadTimeout((int) readTimeoutMs);
+            conn.setDoOutput(true);
+            conn.setDoInput(true);
+            setHeaders(conn, headers, encoding);
+
+            conn.getOutputStream().write(encodedContent.getBytes(MixAll.DEFAULT_CHARSET));
+
+            int respCode = conn.getResponseCode();
+            String resp = null;
+
+            if (HttpURLConnection.HTTP_OK == respCode) {
+                resp = IOTinyUtils.toString(conn.getInputStream(), encoding);
+            } else {
+                resp = IOTinyUtils.toString(conn.getErrorStream(), encoding);
+            }
+            return new HttpResult(respCode, resp);
+        } finally {
+            if (null != conn) {
+                conn.disconnect();
+            }
+        }
+    }
+
+    static public class HttpResult {
+        final public int code;
+        final public String content;
+
+
+        public HttpResult(int code, String content) {
+            this.code = code;
+            this.content = content;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/utils/IOTinyUtils.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/utils/IOTinyUtils.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/utils/IOTinyUtils.java
new file mode 100644
index 0000000..ced2fae
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/utils/IOTinyUtils.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package com.alibaba.rocketmq.common.utils;
+
+import com.alibaba.rocketmq.remoting.common.RemotingHelper;
+
+import java.io.*;
+import java.nio.channels.FileChannel;
+import java.util.ArrayList;
+import java.util.List;
+
+
+/**
+ * @author manhong.yqd
+ */
+public class IOTinyUtils {
+
+    static public String toString(InputStream input, String encoding) throws IOException {
+        return (null == encoding) ? toString(new InputStreamReader(input, RemotingHelper.DEFAULT_CHARSET)) : toString(new InputStreamReader(
+                input, encoding));
+    }
+
+
+    static public String toString(Reader reader) throws IOException {
+        CharArrayWriter sw = new CharArrayWriter();
+        copy(reader, sw);
+        return sw.toString();
+    }
+
+
+    static public long copy(Reader input, Writer output) throws IOException {
+        char[] buffer = new char[1 << 12];
+        long count = 0;
+        for (int n = 0; (n = input.read(buffer)) >= 0; ) {
+            output.write(buffer, 0, n);
+            count += n;
+        }
+        return count;
+    }
+
+
+    /**
+
+     */
+    static public List<String> readLines(Reader input) throws IOException {
+        BufferedReader reader = toBufferedReader(input);
+        List<String> list = new ArrayList<String>();
+        String line = null;
+        for (;;) {
+            line = reader.readLine();
+            if (null != line) {
+                list.add(line);
+            } else {
+                break;
+            }
+        }
+        return list;
+    }
+
+
+    static private BufferedReader toBufferedReader(Reader reader) {
+        return reader instanceof BufferedReader ? (BufferedReader) reader : new BufferedReader(reader);
+    }
+
+
+    static public void copyFile(String source, String target) throws IOException {
+        File sf = new File(source);
+        if (!sf.exists()) {
+            throw new IllegalArgumentException("source file does not exist.");
+        }
+        File tf = new File(target);
+        tf.getParentFile().mkdirs();
+        if (!tf.exists() && !tf.createNewFile()) {
+            throw new RuntimeException("failed to create target file.");
+        }
+
+        FileChannel sc = null;
+        FileChannel tc = null;
+        try {
+            tc = new FileOutputStream(tf).getChannel();
+            sc = new FileInputStream(sf).getChannel();
+            sc.transferTo(0, sc.size(), tc);
+        } finally {
+            if (null != sc) {
+                sc.close();
+            }
+            if (null != tc) {
+                tc.close();
+            }
+        }
+    }
+
+
+    public static void delete(File fileOrDir) throws IOException {
+        if (fileOrDir == null) {
+            return;
+        }
+
+        if (fileOrDir.isDirectory()) {
+            cleanDirectory(fileOrDir);
+        }
+
+        fileOrDir.delete();
+    }
+
+
+    /**
+
+     */
+    public static void cleanDirectory(File directory) throws IOException {
+        if (!directory.exists()) {
+            String message = directory + " does not exist";
+            throw new IllegalArgumentException(message);
+        }
+
+        if (!directory.isDirectory()) {
+            String message = directory + " is not a directory";
+            throw new IllegalArgumentException(message);
+        }
+
+        File[] files = directory.listFiles();
+        if (files == null) { // null if security restricted
+            throw new IOException("Failed to list contents of " + directory);
+        }
+
+        IOException exception = null;
+        for (File file : files) {
+            try {
+                delete(file);
+            } catch (IOException ioe) {
+                exception = ioe;
+            }
+        }
+
+        if (null != exception) {
+            throw exception;
+        }
+    }
+
+
+    public static void writeStringToFile(File file, String data, String encoding) throws IOException {
+        OutputStream os = null;
+        try {
+            os = new FileOutputStream(file);
+            os.write(data.getBytes(encoding));
+        } finally {
+            if (null != os) {
+                os.close();
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/test/java/com/alibaba/rocketmq/common/MixAllTest.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/test/java/com/alibaba/rocketmq/common/MixAllTest.java b/rocketmq-common/src/test/java/com/alibaba/rocketmq/common/MixAllTest.java
new file mode 100644
index 0000000..72e02d0
--- /dev/null
+++ b/rocketmq-common/src/test/java/com/alibaba/rocketmq/common/MixAllTest.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package com.alibaba.rocketmq.common;
+
+import junit.framework.Assert;
+import org.junit.Test;
+
+import java.net.InetAddress;
+import java.util.List;
+
+
+/**
+ * @author lansheng.zj
+ */
+public class MixAllTest {
+
+    @Test
+    public void test() throws Exception {
+        List<String> localInetAddress = MixAll.getLocalInetAddress();
+        String local = InetAddress.getLocalHost().getHostAddress();
+        Assert.assertTrue(localInetAddress.contains("127.0.0.1"));
+        Assert.assertTrue(localInetAddress.contains(local));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/test/java/com/alibaba/rocketmq/common/RemotingUtilTest.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/test/java/com/alibaba/rocketmq/common/RemotingUtilTest.java b/rocketmq-common/src/test/java/com/alibaba/rocketmq/common/RemotingUtilTest.java
new file mode 100644
index 0000000..e6468b9
--- /dev/null
+++ b/rocketmq-common/src/test/java/com/alibaba/rocketmq/common/RemotingUtilTest.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.common;
+
+import com.alibaba.rocketmq.remoting.common.RemotingUtil;
+import org.junit.Test;
+
+
+public class RemotingUtilTest {
+    @Test
+    public void test() throws Exception {
+        String a = RemotingUtil.getLocalAddress();
+        System.out.println(a);
+    }
+}