You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2016/12/28 02:44:02 UTC

[06/58] [abbrv] [partial] incubator-rocketmq git commit: ROCKETMQ-18 Rename package name from com.alibaba to org.apache

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/stats/StatsItem.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/stats/StatsItem.java b/common/src/main/java/org/apache/rocketmq/common/stats/StatsItem.java
new file mode 100644
index 0000000..42bb561
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/stats/StatsItem.java
@@ -0,0 +1,272 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.stats;
+
+import org.apache.rocketmq.common.UtilAll;
+import org.slf4j.Logger;
+
+import java.util.LinkedList;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+
+public class StatsItem {
+
+    private final AtomicLong value = new AtomicLong(0);
+
+    private final AtomicLong times = new AtomicLong(0);
+
+    private final LinkedList<CallSnapshot> csListMinute = new LinkedList<CallSnapshot>();
+
+
+    private final LinkedList<CallSnapshot> csListHour = new LinkedList<CallSnapshot>();
+
+
+    private final LinkedList<CallSnapshot> csListDay = new LinkedList<CallSnapshot>();
+
+    private final String statsName;
+    private final String statsKey;
+    private final ScheduledExecutorService scheduledExecutorService;
+    private final Logger log;
+
+
+    public StatsItem(String statsName, String statsKey, ScheduledExecutorService scheduledExecutorService,
+                     Logger log) {
+        this.statsName = statsName;
+        this.statsKey = statsKey;
+        this.scheduledExecutorService = scheduledExecutorService;
+        this.log = log;
+    }
+
+    public StatsSnapshot getStatsDataInMinute() {
+        return computeStatsData(this.csListMinute);
+    }
+
+    private static StatsSnapshot computeStatsData(final LinkedList<CallSnapshot> csList) {
+        StatsSnapshot statsSnapshot = new StatsSnapshot();
+        synchronized (csList) {
+            double tps = 0;
+            double avgpt = 0;
+            long sum = 0;
+            if (!csList.isEmpty()) {
+                CallSnapshot first = csList.getFirst();
+                CallSnapshot last = csList.getLast();
+                sum = last.getValue() - first.getValue();
+                tps = (sum * 1000.0d) / (last.getTimestamp() - first.getTimestamp());
+
+                long timesDiff = last.getTimes() - first.getTimes();
+                if (timesDiff > 0) {
+                    avgpt = (sum * 1.0d) / timesDiff;
+                }
+            }
+
+            statsSnapshot.setSum(sum);
+            statsSnapshot.setTps(tps);
+            statsSnapshot.setAvgpt(avgpt);
+        }
+
+        return statsSnapshot;
+    }
+
+    public StatsSnapshot getStatsDataInHour() {
+        return computeStatsData(this.csListHour);
+    }
+
+    public StatsSnapshot getStatsDataInDay() {
+        return computeStatsData(this.csListDay);
+    }
+
+    public void init() {
+
+        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    samplingInSeconds();
+                } catch (Throwable e) {
+                }
+            }
+        }, 0, 10, TimeUnit.SECONDS);
+
+
+        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    samplingInMinutes();
+                } catch (Throwable e) {
+                }
+            }
+        }, 0, 10, TimeUnit.MINUTES);
+
+
+        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    samplingInHour();
+                } catch (Throwable e) {
+                }
+            }
+        }, 0, 1, TimeUnit.HOURS);
+
+
+        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    printAtMinutes();
+                } catch (Throwable ignored) {
+                }
+            }
+        }, Math.abs(UtilAll.computNextMinutesTimeMillis() - System.currentTimeMillis()), 1000 * 60, TimeUnit.MILLISECONDS);
+
+
+        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    printAtHour();
+                } catch (Throwable ignored) {
+                }
+            }
+        }, Math.abs(UtilAll.computNextHourTimeMillis() - System.currentTimeMillis()), 1000 * 60 * 60, TimeUnit.MILLISECONDS);
+
+
+        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    printAtDay();
+                } catch (Throwable ignored) {
+                }
+            }
+        }, Math.abs(UtilAll.computNextMorningTimeMillis() - System.currentTimeMillis()) - 2000, 1000 * 60 * 60 * 24, TimeUnit.MILLISECONDS);
+    }
+
+    public void samplingInSeconds() {
+        synchronized (this.csListMinute) {
+            this.csListMinute.add(new CallSnapshot(System.currentTimeMillis(), this.times.get(), this.value
+                    .get()));
+            if (this.csListMinute.size() > 7) {
+                this.csListMinute.removeFirst();
+            }
+        }
+    }
+
+    public void samplingInMinutes() {
+        synchronized (this.csListHour) {
+            this.csListHour.add(new CallSnapshot(System.currentTimeMillis(), this.times.get(), this.value
+                    .get()));
+            if (this.csListHour.size() > 7) {
+                this.csListHour.removeFirst();
+            }
+        }
+    }
+
+    public void samplingInHour() {
+        synchronized (this.csListDay) {
+            this.csListDay.add(new CallSnapshot(System.currentTimeMillis(), this.times.get(), this.value
+                    .get()));
+            if (this.csListDay.size() > 25) {
+                this.csListDay.removeFirst();
+            }
+        }
+    }
+
+    public void printAtMinutes() {
+        StatsSnapshot ss = computeStatsData(this.csListMinute);
+        log.info(String.format("[%s] [%s] Stats In One Minute, SUM: %d TPS: %.2f AVGPT: %.2f",
+                this.statsName,
+                this.statsKey,
+                ss.getSum(),
+                ss.getTps(),
+                ss.getAvgpt()));
+    }
+
+    public void printAtHour() {
+        StatsSnapshot ss = computeStatsData(this.csListHour);
+        log.info(String.format("[%s] [%s] Stats In One Hour, SUM: %d TPS: %.2f AVGPT: %.2f",
+                this.statsName,
+                this.statsKey,
+                ss.getSum(),
+                ss.getTps(),
+                ss.getAvgpt()));
+    }
+
+    public void printAtDay() {
+        StatsSnapshot ss = computeStatsData(this.csListDay);
+        log.info(String.format("[%s] [%s] Stats In One Day, SUM: %d TPS: %.2f AVGPT: %.2f",
+                this.statsName,
+                this.statsKey,
+                ss.getSum(),
+                ss.getTps(),
+                ss.getAvgpt()));
+    }
+
+    public AtomicLong getValue() {
+        return value;
+    }
+
+
+    public String getStatsKey() {
+        return statsKey;
+    }
+
+
+    public String getStatsName() {
+        return statsName;
+    }
+
+
+    public AtomicLong getTimes() {
+        return times;
+    }
+}
+
+
+class CallSnapshot {
+    private final long timestamp;
+    private final long times;
+
+    private final long value;
+
+
+    public CallSnapshot(long timestamp, long times, long value) {
+        super();
+        this.timestamp = timestamp;
+        this.times = times;
+        this.value = value;
+    }
+
+
+    public long getTimestamp() {
+        return timestamp;
+    }
+
+
+    public long getTimes() {
+        return times;
+    }
+
+
+    public long getValue() {
+        return value;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/stats/StatsItemSet.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/stats/StatsItemSet.java b/common/src/main/java/org/apache/rocketmq/common/stats/StatsItemSet.java
new file mode 100644
index 0000000..919745d
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/stats/StatsItemSet.java
@@ -0,0 +1,210 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.stats;
+
+import org.apache.rocketmq.common.UtilAll;
+import org.slf4j.Logger;
+
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+
+public class StatsItemSet {
+    private final ConcurrentHashMap<String/* key */, StatsItem> statsItemTable =
+            new ConcurrentHashMap<String, StatsItem>(128);
+
+    private final String statsName;
+    private final ScheduledExecutorService scheduledExecutorService;
+    private final Logger log;
+
+
+    public StatsItemSet(String statsName, ScheduledExecutorService scheduledExecutorService, Logger log) {
+        this.statsName = statsName;
+        this.scheduledExecutorService = scheduledExecutorService;
+        this.log = log;
+        this.init();
+    }
+
+    public void init() {
+
+        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    samplingInSeconds();
+                } catch (Throwable e) {
+                }
+            }
+        }, 0, 10, TimeUnit.SECONDS);
+
+
+        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    samplingInMinutes();
+                } catch (Throwable e) {
+                }
+            }
+        }, 0, 10, TimeUnit.MINUTES);
+
+
+        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    samplingInHour();
+                } catch (Throwable e) {
+                }
+            }
+        }, 0, 1, TimeUnit.HOURS);
+
+
+        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    printAtMinutes();
+                } catch (Throwable e) {
+                }
+            }
+        }, Math.abs(UtilAll.computNextMinutesTimeMillis() - System.currentTimeMillis()), 1000 * 60, TimeUnit.MILLISECONDS);
+
+
+        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    printAtHour();
+                } catch (Throwable e) {
+                }
+            }
+        }, Math.abs(UtilAll.computNextHourTimeMillis() - System.currentTimeMillis()), 1000 * 60 * 60, TimeUnit.MILLISECONDS);
+
+
+        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    printAtDay();
+                } catch (Throwable e) {
+                }
+            }
+        }, Math.abs(UtilAll.computNextMorningTimeMillis() - System.currentTimeMillis()), 1000 * 60 * 60 * 24, TimeUnit.MILLISECONDS);
+    }
+
+    private void samplingInSeconds() {
+        Iterator<Entry<String, StatsItem>> it = this.statsItemTable.entrySet().iterator();
+        while (it.hasNext()) {
+            Entry<String, StatsItem> next = it.next();
+            next.getValue().samplingInSeconds();
+        }
+    }
+
+    private void samplingInMinutes() {
+        Iterator<Entry<String, StatsItem>> it = this.statsItemTable.entrySet().iterator();
+        while (it.hasNext()) {
+            Entry<String, StatsItem> next = it.next();
+            next.getValue().samplingInMinutes();
+        }
+    }
+
+    private void samplingInHour() {
+        Iterator<Entry<String, StatsItem>> it = this.statsItemTable.entrySet().iterator();
+        while (it.hasNext()) {
+            Entry<String, StatsItem> next = it.next();
+            next.getValue().samplingInHour();
+        }
+    }
+
+    private void printAtMinutes() {
+        Iterator<Entry<String, StatsItem>> it = this.statsItemTable.entrySet().iterator();
+        while (it.hasNext()) {
+            Entry<String, StatsItem> next = it.next();
+            next.getValue().printAtMinutes();
+        }
+    }
+
+    private void printAtHour() {
+        Iterator<Entry<String, StatsItem>> it = this.statsItemTable.entrySet().iterator();
+        while (it.hasNext()) {
+            Entry<String, StatsItem> next = it.next();
+            next.getValue().printAtHour();
+        }
+    }
+
+    private void printAtDay() {
+        Iterator<Entry<String, StatsItem>> it = this.statsItemTable.entrySet().iterator();
+        while (it.hasNext()) {
+            Entry<String, StatsItem> next = it.next();
+            next.getValue().printAtDay();
+        }
+    }
+
+    public void addValue(final String statsKey, final int incValue, final int incTimes) {
+        StatsItem statsItem = this.getAndCreateStatsItem(statsKey);
+        statsItem.getValue().addAndGet(incValue);
+        statsItem.getTimes().addAndGet(incTimes);
+    }
+
+    public StatsItem getAndCreateStatsItem(final String statsKey) {
+        StatsItem statsItem = this.statsItemTable.get(statsKey);
+        if (null == statsItem) {
+            statsItem = new StatsItem(this.statsName, statsKey, this.scheduledExecutorService, this.log);
+            StatsItem prev = this.statsItemTable.put(statsKey, statsItem);
+
+            if (null == prev) {
+
+                // statsItem.init();
+            }
+        }
+
+        return statsItem;
+    }
+
+    public StatsSnapshot getStatsDataInMinute(final String statsKey) {
+        StatsItem statsItem = this.statsItemTable.get(statsKey);
+        if (null != statsItem) {
+            return statsItem.getStatsDataInMinute();
+        }
+        return new StatsSnapshot();
+    }
+
+    public StatsSnapshot getStatsDataInHour(final String statsKey) {
+        StatsItem statsItem = this.statsItemTable.get(statsKey);
+        if (null != statsItem) {
+            return statsItem.getStatsDataInHour();
+        }
+        return new StatsSnapshot();
+    }
+
+    public StatsSnapshot getStatsDataInDay(final String statsKey) {
+        StatsItem statsItem = this.statsItemTable.get(statsKey);
+        if (null != statsItem) {
+            return statsItem.getStatsDataInDay();
+        }
+        return new StatsSnapshot();
+    }
+
+    public StatsItem getStatsItem(final String statsKey) {
+        return this.statsItemTable.get(statsKey);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/stats/StatsSnapshot.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/stats/StatsSnapshot.java b/common/src/main/java/org/apache/rocketmq/common/stats/StatsSnapshot.java
new file mode 100644
index 0000000..652d214
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/stats/StatsSnapshot.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.common.stats;
+
+public class StatsSnapshot {
+    private long sum;
+    private double tps;
+    private double avgpt;
+
+
+    public long getSum() {
+        return sum;
+    }
+
+
+    public void setSum(long sum) {
+        this.sum = sum;
+    }
+
+
+    public double getTps() {
+        return tps;
+    }
+
+
+    public void setTps(double tps) {
+        this.tps = tps;
+    }
+
+
+    public double getAvgpt() {
+        return avgpt;
+    }
+
+
+    public void setAvgpt(double avgpt) {
+        this.avgpt = avgpt;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/subscription/SubscriptionGroupConfig.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/subscription/SubscriptionGroupConfig.java b/common/src/main/java/org/apache/rocketmq/common/subscription/SubscriptionGroupConfig.java
new file mode 100644
index 0000000..5c3a3c3
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/subscription/SubscriptionGroupConfig.java
@@ -0,0 +1,194 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.subscription;
+
+import org.apache.rocketmq.common.MixAll;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class SubscriptionGroupConfig {
+
+    private String groupName;
+
+    private boolean consumeEnable = true;
+    private boolean consumeFromMinEnable = true;
+
+    private boolean consumeBroadcastEnable = true;
+
+    private int retryQueueNums = 1;
+
+    private int retryMaxTimes = 16;
+
+    private long brokerId = MixAll.MASTER_ID;
+
+    private long whichBrokerWhenConsumeSlowly = 1;
+
+    private boolean notifyConsumerIdsChangedEnable = true;
+
+
+    public String getGroupName() {
+        return groupName;
+    }
+
+
+    public void setGroupName(String groupName) {
+        this.groupName = groupName;
+    }
+
+
+    public boolean isConsumeEnable() {
+        return consumeEnable;
+    }
+
+
+    public void setConsumeEnable(boolean consumeEnable) {
+        this.consumeEnable = consumeEnable;
+    }
+
+
+    public boolean isConsumeFromMinEnable() {
+        return consumeFromMinEnable;
+    }
+
+
+    public void setConsumeFromMinEnable(boolean consumeFromMinEnable) {
+        this.consumeFromMinEnable = consumeFromMinEnable;
+    }
+
+
+    public boolean isConsumeBroadcastEnable() {
+        return consumeBroadcastEnable;
+    }
+
+
+    public void setConsumeBroadcastEnable(boolean consumeBroadcastEnable) {
+        this.consumeBroadcastEnable = consumeBroadcastEnable;
+    }
+
+
+    public int getRetryQueueNums() {
+        return retryQueueNums;
+    }
+
+
+    public void setRetryQueueNums(int retryQueueNums) {
+        this.retryQueueNums = retryQueueNums;
+    }
+
+
+    public int getRetryMaxTimes() {
+        return retryMaxTimes;
+    }
+
+
+    public void setRetryMaxTimes(int retryMaxTimes) {
+        this.retryMaxTimes = retryMaxTimes;
+    }
+
+
+    public long getBrokerId() {
+        return brokerId;
+    }
+
+
+    public void setBrokerId(long brokerId) {
+        this.brokerId = brokerId;
+    }
+
+
+    public long getWhichBrokerWhenConsumeSlowly() {
+        return whichBrokerWhenConsumeSlowly;
+    }
+
+
+    public void setWhichBrokerWhenConsumeSlowly(long whichBrokerWhenConsumeSlowly) {
+        this.whichBrokerWhenConsumeSlowly = whichBrokerWhenConsumeSlowly;
+    }
+
+    public boolean isNotifyConsumerIdsChangedEnable() {
+        return notifyConsumerIdsChangedEnable;
+    }
+
+    public void setNotifyConsumerIdsChangedEnable(final boolean notifyConsumerIdsChangedEnable) {
+        this.notifyConsumerIdsChangedEnable = notifyConsumerIdsChangedEnable;
+    }
+
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + (int) (brokerId ^ (brokerId >>> 32));
+        result = prime * result + (consumeBroadcastEnable ? 1231 : 1237);
+        result = prime * result + (consumeEnable ? 1231 : 1237);
+        result = prime * result + (consumeFromMinEnable ? 1231 : 1237);
+        result = prime * result + (notifyConsumerIdsChangedEnable ? 1231 : 1237);
+        result = prime * result + ((groupName == null) ? 0 : groupName.hashCode());
+        result = prime * result + retryMaxTimes;
+        result = prime * result + retryQueueNums;
+        result =
+                prime * result + (int) (whichBrokerWhenConsumeSlowly ^ (whichBrokerWhenConsumeSlowly >>> 32));
+        return result;
+    }
+
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj)
+            return true;
+        if (obj == null)
+            return false;
+        if (getClass() != obj.getClass())
+            return false;
+        SubscriptionGroupConfig other = (SubscriptionGroupConfig) obj;
+        if (brokerId != other.brokerId)
+            return false;
+        if (consumeBroadcastEnable != other.consumeBroadcastEnable)
+            return false;
+        if (consumeEnable != other.consumeEnable)
+            return false;
+        if (consumeFromMinEnable != other.consumeFromMinEnable)
+            return false;
+        if (groupName == null) {
+            if (other.groupName != null)
+                return false;
+        } else if (!groupName.equals(other.groupName))
+            return false;
+        if (retryMaxTimes != other.retryMaxTimes)
+            return false;
+        if (retryQueueNums != other.retryQueueNums)
+            return false;
+        if (whichBrokerWhenConsumeSlowly != other.whichBrokerWhenConsumeSlowly)
+            return false;
+        if (notifyConsumerIdsChangedEnable != other.notifyConsumerIdsChangedEnable)
+            return false;
+        return true;
+    }
+
+
+    @Override
+    public String toString() {
+        return "SubscriptionGroupConfig [groupName=" + groupName + ", consumeEnable=" + consumeEnable
+                + ", consumeFromMinEnable=" + consumeFromMinEnable + ", consumeBroadcastEnable="
+                + consumeBroadcastEnable + ", retryQueueNums=" + retryQueueNums + ", retryMaxTimes="
+                + retryMaxTimes + ", brokerId=" + brokerId + ", whichBrokerWhenConsumeSlowly="
+                + whichBrokerWhenConsumeSlowly + ", notifyConsumerIdsChangedEnable="
+                + notifyConsumerIdsChangedEnable + "]";
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/sysflag/MessageSysFlag.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/sysflag/MessageSysFlag.java b/common/src/main/java/org/apache/rocketmq/common/sysflag/MessageSysFlag.java
new file mode 100644
index 0000000..8a069e5
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/sysflag/MessageSysFlag.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.sysflag;
+
+/**
+ * @author shijia.wxr
+ */
+public class MessageSysFlag {
+    public final static int COMPRESSED_FLAG = 0x1;
+    public final static int MULTI_TAGS_FLAG = 0x1 << 1;
+    public final static int TRANSACTION_NOT_TYPE = 0;
+    public final static int TRANSACTION_PREPARED_TYPE = 0x1 << 2;
+    public final static int TRANSACTION_COMMIT_TYPE = 0x2 << 2;
+    public final static int TRANSACTION_ROLLBACK_TYPE = 0x3 << 2;
+
+
+    public static int getTransactionValue(final int flag) {
+        return flag & TRANSACTION_ROLLBACK_TYPE;
+    }
+
+
+    public static int resetTransactionValue(final int flag, final int type) {
+        return (flag & (~TRANSACTION_ROLLBACK_TYPE)) | type;
+    }
+
+
+    public static int clearCompressedFlag(final int flag) {
+        return flag & (~COMPRESSED_FLAG);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/sysflag/PullSysFlag.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/sysflag/PullSysFlag.java b/common/src/main/java/org/apache/rocketmq/common/sysflag/PullSysFlag.java
new file mode 100644
index 0000000..cc2a5c8
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/sysflag/PullSysFlag.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.common.sysflag;
+
+/**
+ * @author shijia.wxr
+ */
+public class PullSysFlag {
+    private final static int FLAG_COMMIT_OFFSET = 0x1 << 0;
+    private final static int FLAG_SUSPEND = 0x1 << 1;
+    private final static int FLAG_SUBSCRIPTION = 0x1 << 2;
+    private final static int FLAG_CLASS_FILTER = 0x1 << 3;
+
+
+    public static int buildSysFlag(final boolean commitOffset, final boolean suspend,
+                                   final boolean subscription, final boolean classFilter) {
+        int flag = 0;
+
+        if (commitOffset) {
+            flag |= FLAG_COMMIT_OFFSET;
+        }
+
+        if (suspend) {
+            flag |= FLAG_SUSPEND;
+        }
+
+        if (subscription) {
+            flag |= FLAG_SUBSCRIPTION;
+        }
+
+        if (classFilter) {
+            flag |= FLAG_CLASS_FILTER;
+        }
+
+        return flag;
+    }
+
+
+    public static int clearCommitOffsetFlag(final int sysFlag) {
+        return sysFlag & (~FLAG_COMMIT_OFFSET);
+    }
+
+
+    public static boolean hasCommitOffsetFlag(final int sysFlag) {
+        return (sysFlag & FLAG_COMMIT_OFFSET) == FLAG_COMMIT_OFFSET;
+    }
+
+
+    public static boolean hasSuspendFlag(final int sysFlag) {
+        return (sysFlag & FLAG_SUSPEND) == FLAG_SUSPEND;
+    }
+
+
+    public static boolean hasSubscriptionFlag(final int sysFlag) {
+        return (sysFlag & FLAG_SUBSCRIPTION) == FLAG_SUBSCRIPTION;
+    }
+
+
+    public static boolean hasClassFilterFlag(final int sysFlag) {
+        return (sysFlag & FLAG_CLASS_FILTER) == FLAG_CLASS_FILTER;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/sysflag/SubscriptionSysFlag.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/sysflag/SubscriptionSysFlag.java b/common/src/main/java/org/apache/rocketmq/common/sysflag/SubscriptionSysFlag.java
new file mode 100644
index 0000000..2761a0b
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/sysflag/SubscriptionSysFlag.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.sysflag;
+
+/**
+ * @author manhong.yqd
+ */
+public class SubscriptionSysFlag {
+
+    private final static int FLAG_UNIT = 0x1 << 0;
+
+
+    public static int buildSysFlag(final boolean unit) {
+        int sysFlag = 0;
+
+        if (unit) {
+            sysFlag |= FLAG_UNIT;
+        }
+
+        return sysFlag;
+    }
+
+
+    public static int setUnitFlag(final int sysFlag) {
+        return sysFlag | FLAG_UNIT;
+    }
+
+
+    public static int clearUnitFlag(final int sysFlag) {
+        return sysFlag & (~FLAG_UNIT);
+    }
+
+
+    public static boolean hasUnitFlag(final int sysFlag) {
+        return (sysFlag & FLAG_UNIT) == FLAG_UNIT;
+    }
+
+
+    public static void main(String[] args) {
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/sysflag/TopicSysFlag.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/sysflag/TopicSysFlag.java b/common/src/main/java/org/apache/rocketmq/common/sysflag/TopicSysFlag.java
new file mode 100644
index 0000000..b12108a
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/sysflag/TopicSysFlag.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.sysflag;
+
+/**
+
+ *
+ * @author manhong.yqd
+ *
+ */
+public class TopicSysFlag {
+
+    private final static int FLAG_UNIT = 0x1 << 0;
+
+    private final static int FLAG_UNIT_SUB = 0x1 << 1;
+
+
+    public static int buildSysFlag(final boolean unit, final boolean hasUnitSub) {
+        int sysFlag = 0;
+
+        if (unit) {
+            sysFlag |= FLAG_UNIT;
+        }
+
+        if (hasUnitSub) {
+            sysFlag |= FLAG_UNIT_SUB;
+        }
+
+        return sysFlag;
+    }
+
+
+    public static int setUnitFlag(final int sysFlag) {
+        return sysFlag | FLAG_UNIT;
+    }
+
+
+    public static int clearUnitFlag(final int sysFlag) {
+        return sysFlag & (~FLAG_UNIT);
+    }
+
+
+    public static boolean hasUnitFlag(final int sysFlag) {
+        return (sysFlag & FLAG_UNIT) == FLAG_UNIT;
+    }
+
+
+    public static int setUnitSubFlag(final int sysFlag) {
+        return sysFlag | FLAG_UNIT_SUB;
+    }
+
+
+    public static int clearUnitSubFlag(final int sysFlag) {
+        return sysFlag & (~FLAG_UNIT_SUB);
+    }
+
+
+    public static boolean hasUnitSubFlag(final int sysFlag) {
+        return (sysFlag & FLAG_UNIT_SUB) == FLAG_UNIT_SUB;
+    }
+
+
+    public static void main(String[] args) {
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/utils/ChannelUtil.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/utils/ChannelUtil.java b/common/src/main/java/org/apache/rocketmq/common/utils/ChannelUtil.java
new file mode 100644
index 0000000..ab017f2
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/utils/ChannelUtil.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.common.utils;
+
+import io.netty.channel.Channel;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+
+public class ChannelUtil {
+    public static String getRemoteIp(Channel channel) {
+        InetSocketAddress inetSocketAddress = (InetSocketAddress) channel.remoteAddress();
+        if (inetSocketAddress == null) {
+            return "";
+        }
+        final InetAddress inetAddr = inetSocketAddress.getAddress();
+        return inetAddr != null ? inetAddr.getHostAddress() : inetSocketAddress.getHostName();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/utils/HttpTinyClient.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/utils/HttpTinyClient.java b/common/src/main/java/org/apache/rocketmq/common/utils/HttpTinyClient.java
new file mode 100755
index 0000000..fcd002c
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/utils/HttpTinyClient.java
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.common.utils;
+
+import org.apache.rocketmq.common.MQVersion;
+import org.apache.rocketmq.common.MixAll;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.net.URLEncoder;
+import java.util.Iterator;
+import java.util.List;
+
+
+public class HttpTinyClient {
+
+    static public HttpResult httpGet(String url, List<String> headers, List<String> paramValues,
+                                     String encoding, long readTimeoutMs) throws IOException {
+        String encodedContent = encodingParams(paramValues, encoding);
+        url += (null == encodedContent) ? "" : ("?" + encodedContent);
+
+        HttpURLConnection conn = null;
+        try {
+            conn = (HttpURLConnection) new URL(url).openConnection();
+            conn.setRequestMethod("GET");
+            conn.setConnectTimeout((int) readTimeoutMs);
+            conn.setReadTimeout((int) readTimeoutMs);
+            setHeaders(conn, headers, encoding);
+
+            conn.connect();
+            int respCode = conn.getResponseCode();
+            String resp = null;
+
+            if (HttpURLConnection.HTTP_OK == respCode) {
+                resp = IOTinyUtils.toString(conn.getInputStream(), encoding);
+            } else {
+                resp = IOTinyUtils.toString(conn.getErrorStream(), encoding);
+            }
+            return new HttpResult(respCode, resp);
+        } finally {
+            if (conn != null) {
+                conn.disconnect();
+            }
+        }
+    }
+
+    static private String encodingParams(List<String> paramValues, String encoding)
+            throws UnsupportedEncodingException {
+        StringBuilder sb = new StringBuilder();
+        if (null == paramValues) {
+            return null;
+        }
+
+        for (Iterator<String> iter = paramValues.iterator(); iter.hasNext(); ) {
+            sb.append(iter.next()).append("=");
+            sb.append(URLEncoder.encode(iter.next(), encoding));
+            if (iter.hasNext()) {
+                sb.append("&");
+            }
+        }
+        return sb.toString();
+    }
+
+    static private void setHeaders(HttpURLConnection conn, List<String> headers, String encoding) {
+        if (null != headers) {
+            for (Iterator<String> iter = headers.iterator(); iter.hasNext(); ) {
+                conn.addRequestProperty(iter.next(), iter.next());
+            }
+        }
+        conn.addRequestProperty("Client-Version", MQVersion.getVersionDesc(MQVersion.CURRENT_VERSION));
+        conn.addRequestProperty("Content-Type", "application/x-www-form-urlencoded;charset=" + encoding);
+
+
+        String ts = String.valueOf(System.currentTimeMillis());
+        conn.addRequestProperty("Metaq-Client-RequestTS", ts);
+    }
+
+    /**
+
+     *
+     * @param url
+     * @param headers
+
+     * @param paramValues
+
+     * @param encoding
+
+     * @param readTimeoutMs
+
+     *
+     * @return the http response of given http post request
+     *
+     * @throws java.io.IOException
+     */
+    static public HttpResult httpPost(String url, List<String> headers, List<String> paramValues,
+                                      String encoding, long readTimeoutMs) throws IOException {
+        String encodedContent = encodingParams(paramValues, encoding);
+
+        HttpURLConnection conn = null;
+        try {
+            conn = (HttpURLConnection) new URL(url).openConnection();
+            conn.setRequestMethod("POST");
+            conn.setConnectTimeout(3000);
+            conn.setReadTimeout((int) readTimeoutMs);
+            conn.setDoOutput(true);
+            conn.setDoInput(true);
+            setHeaders(conn, headers, encoding);
+
+            conn.getOutputStream().write(encodedContent.getBytes(MixAll.DEFAULT_CHARSET));
+
+            int respCode = conn.getResponseCode();
+            String resp = null;
+
+            if (HttpURLConnection.HTTP_OK == respCode) {
+                resp = IOTinyUtils.toString(conn.getInputStream(), encoding);
+            } else {
+                resp = IOTinyUtils.toString(conn.getErrorStream(), encoding);
+            }
+            return new HttpResult(respCode, resp);
+        } finally {
+            if (null != conn) {
+                conn.disconnect();
+            }
+        }
+    }
+
+    static public class HttpResult {
+        final public int code;
+        final public String content;
+
+
+        public HttpResult(int code, String content) {
+            this.code = code;
+            this.content = content;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/utils/IOTinyUtils.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/utils/IOTinyUtils.java b/common/src/main/java/org/apache/rocketmq/common/utils/IOTinyUtils.java
new file mode 100644
index 0000000..3284759
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/utils/IOTinyUtils.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.common.utils;
+
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+
+import java.io.*;
+import java.nio.channels.FileChannel;
+import java.util.ArrayList;
+import java.util.List;
+
+
+/**
+ * @author manhong.yqd
+ */
+public class IOTinyUtils {
+
+    static public String toString(InputStream input, String encoding) throws IOException {
+        return (null == encoding) ? toString(new InputStreamReader(input, RemotingHelper.DEFAULT_CHARSET)) : toString(new InputStreamReader(
+                input, encoding));
+    }
+
+
+    static public String toString(Reader reader) throws IOException {
+        CharArrayWriter sw = new CharArrayWriter();
+        copy(reader, sw);
+        return sw.toString();
+    }
+
+
+    static public long copy(Reader input, Writer output) throws IOException {
+        char[] buffer = new char[1 << 12];
+        long count = 0;
+        for (int n = 0; (n = input.read(buffer)) >= 0; ) {
+            output.write(buffer, 0, n);
+            count += n;
+        }
+        return count;
+    }
+
+
+    /**
+
+     */
+    static public List<String> readLines(Reader input) throws IOException {
+        BufferedReader reader = toBufferedReader(input);
+        List<String> list = new ArrayList<String>();
+        String line = null;
+        for (;;) {
+            line = reader.readLine();
+            if (null != line) {
+                list.add(line);
+            } else {
+                break;
+            }
+        }
+        return list;
+    }
+
+
+    static private BufferedReader toBufferedReader(Reader reader) {
+        return reader instanceof BufferedReader ? (BufferedReader) reader : new BufferedReader(reader);
+    }
+
+
+    static public void copyFile(String source, String target) throws IOException {
+        File sf = new File(source);
+        if (!sf.exists()) {
+            throw new IllegalArgumentException("source file does not exist.");
+        }
+        File tf = new File(target);
+        tf.getParentFile().mkdirs();
+        if (!tf.exists() && !tf.createNewFile()) {
+            throw new RuntimeException("failed to create target file.");
+        }
+
+        FileChannel sc = null;
+        FileChannel tc = null;
+        try {
+            tc = new FileOutputStream(tf).getChannel();
+            sc = new FileInputStream(sf).getChannel();
+            sc.transferTo(0, sc.size(), tc);
+        } finally {
+            if (null != sc) {
+                sc.close();
+            }
+            if (null != tc) {
+                tc.close();
+            }
+        }
+    }
+
+
+    public static void delete(File fileOrDir) throws IOException {
+        if (fileOrDir == null) {
+            return;
+        }
+
+        if (fileOrDir.isDirectory()) {
+            cleanDirectory(fileOrDir);
+        }
+
+        fileOrDir.delete();
+    }
+
+
+    /**
+
+     */
+    public static void cleanDirectory(File directory) throws IOException {
+        if (!directory.exists()) {
+            String message = directory + " does not exist";
+            throw new IllegalArgumentException(message);
+        }
+
+        if (!directory.isDirectory()) {
+            String message = directory + " is not a directory";
+            throw new IllegalArgumentException(message);
+        }
+
+        File[] files = directory.listFiles();
+        if (files == null) { // null if security restricted
+            throw new IOException("Failed to list contents of " + directory);
+        }
+
+        IOException exception = null;
+        for (File file : files) {
+            try {
+                delete(file);
+            } catch (IOException ioe) {
+                exception = ioe;
+            }
+        }
+
+        if (null != exception) {
+            throw exception;
+        }
+    }
+
+
+    public static void writeStringToFile(File file, String data, String encoding) throws IOException {
+        OutputStream os = null;
+        try {
+            os = new FileOutputStream(file);
+            os.write(data.getBytes(encoding));
+        } finally {
+            if (null != os) {
+                os.close();
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/test/java/com/alibaba/rocketmq/common/MixAllTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/com/alibaba/rocketmq/common/MixAllTest.java b/common/src/test/java/com/alibaba/rocketmq/common/MixAllTest.java
deleted file mode 100644
index 72e02d0..0000000
--- a/common/src/test/java/com/alibaba/rocketmq/common/MixAllTest.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package com.alibaba.rocketmq.common;
-
-import junit.framework.Assert;
-import org.junit.Test;
-
-import java.net.InetAddress;
-import java.util.List;
-
-
-/**
- * @author lansheng.zj
- */
-public class MixAllTest {
-
-    @Test
-    public void test() throws Exception {
-        List<String> localInetAddress = MixAll.getLocalInetAddress();
-        String local = InetAddress.getLocalHost().getHostAddress();
-        Assert.assertTrue(localInetAddress.contains("127.0.0.1"));
-        Assert.assertTrue(localInetAddress.contains(local));
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/test/java/com/alibaba/rocketmq/common/RemotingUtilTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/com/alibaba/rocketmq/common/RemotingUtilTest.java b/common/src/test/java/com/alibaba/rocketmq/common/RemotingUtilTest.java
deleted file mode 100644
index e6468b9..0000000
--- a/common/src/test/java/com/alibaba/rocketmq/common/RemotingUtilTest.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package com.alibaba.rocketmq.common;
-
-import com.alibaba.rocketmq.remoting.common.RemotingUtil;
-import org.junit.Test;
-
-
-public class RemotingUtilTest {
-    @Test
-    public void test() throws Exception {
-        String a = RemotingUtil.getLocalAddress();
-        System.out.println(a);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/test/java/com/alibaba/rocketmq/common/UtilAllTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/com/alibaba/rocketmq/common/UtilAllTest.java b/common/src/test/java/com/alibaba/rocketmq/common/UtilAllTest.java
deleted file mode 100644
index 7764fcc..0000000
--- a/common/src/test/java/com/alibaba/rocketmq/common/UtilAllTest.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package com.alibaba.rocketmq.common;
-
-import org.junit.Test;
-
-import java.net.URL;
-import java.util.Properties;
-
-import static org.junit.Assert.assertTrue;
-
-
-public class UtilAllTest {
-
-    @Test
-    public void test_currentStackTrace() {
-        System.out.println(UtilAll.currentStackTrace());
-    }
-
-
-    @Test
-    public void test_a() {
-        URL url = this.getClass().getProtectionDomain().getCodeSource().getLocation();
-        System.out.println(url);
-        System.out.println(url.getPath());
-    }
-
-
-    @Test
-    public void test_resetClassProperties() {
-        DemoConfig demoConfig = new DemoConfig();
-        MixAll.properties2Object(new Properties(), demoConfig);
-    }
-
-
-    @Test
-    public void test_properties2String() {
-        DemoConfig demoConfig = new DemoConfig();
-        Properties properties = MixAll.object2Properties(demoConfig);
-        System.out.println(MixAll.properties2String(properties));
-    }
-
-
-    @Test
-    public void test_timeMillisToHumanString() {
-        System.out.println(UtilAll.timeMillisToHumanString());
-    }
-
-
-    @Test
-    public void test_isPropertiesEqual() {
-        final Properties p1 = new Properties();
-        final Properties p2 = new Properties();
-
-        p1.setProperty("a", "1");
-        p1.setProperty("b", "2");
-
-        p2.setProperty("a", "1");
-        p2.setProperty("b", "2");
-        // p2.setProperty("c", "3");
-
-        assertTrue(MixAll.isPropertiesEqual(p1, p2));
-    }
-
-
-    @Test
-    public void test_getpid() {
-        int pid = UtilAll.getPid();
-
-        System.out.println("PID = " + pid);
-        assertTrue(pid > 0);
-    }
-
-
-    @Test
-    public void test_isBlank() {
-        {
-            boolean result = UtilAll.isBlank("Hello ");
-            assertTrue(!result);
-        }
-
-        {
-            boolean result = UtilAll.isBlank(" Hello");
-            assertTrue(!result);
-        }
-
-        {
-            boolean result = UtilAll.isBlank("He llo");
-            assertTrue(!result);
-        }
-
-        {
-            boolean result = UtilAll.isBlank("  ");
-            assertTrue(result);
-        }
-
-        {
-            boolean result = UtilAll.isBlank("Hello");
-            assertTrue(!result);
-        }
-    }
-
-    static class DemoConfig {
-        private int demoWidth = 0;
-        private int demoLength = 0;
-        private boolean demoOK = false;
-        private String demoName = "haha";
-
-
-        public int getDemoWidth() {
-            return demoWidth;
-        }
-
-
-        public void setDemoWidth(int demoWidth) {
-            this.demoWidth = demoWidth;
-        }
-
-
-        public int getDemoLength() {
-            return demoLength;
-        }
-
-
-        public void setDemoLength(int demoLength) {
-            this.demoLength = demoLength;
-        }
-
-
-        public boolean isDemoOK() {
-            return demoOK;
-        }
-
-
-        public void setDemoOK(boolean demoOK) {
-            this.demoOK = demoOK;
-        }
-
-
-        public String getDemoName() {
-            return demoName;
-        }
-
-
-        public void setDemoNfieldame(String demoName) {
-            this.demoName = demoName;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/test/java/com/alibaba/rocketmq/common/filter/FilterAPITest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/com/alibaba/rocketmq/common/filter/FilterAPITest.java b/common/src/test/java/com/alibaba/rocketmq/common/filter/FilterAPITest.java
deleted file mode 100644
index e45873b..0000000
--- a/common/src/test/java/com/alibaba/rocketmq/common/filter/FilterAPITest.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package com.alibaba.rocketmq.common.filter;
-
-import com.alibaba.rocketmq.common.protocol.heartbeat.SubscriptionData;
-import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable;
-import org.junit.Test;
-
-
-/**
- * @author shijia.wxr
- *
- */
-public class FilterAPITest {
-
-    @Test
-    public void testBuildSubscriptionData() throws Exception {
-        SubscriptionData subscriptionData =
-                FilterAPI.buildSubscriptionData("ConsumerGroup1", "TestTopic", "TAG1 || Tag2 || tag3");
-        System.out.println(subscriptionData);
-    }
-
-    @Test
-    public void testSubscriptionData() throws Exception {
-        SubscriptionData subscriptionData =
-                FilterAPI.buildSubscriptionData("ConsumerGroup1", "TestTopic", "TAG1 || Tag2 || tag3");
-        subscriptionData.setFilterClassSource("java hello");
-        String json = RemotingSerializable.toJson(subscriptionData, true);
-        System.out.println(json);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/test/java/com/alibaba/rocketmq/common/filter/PolishExprTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/com/alibaba/rocketmq/common/filter/PolishExprTest.java b/common/src/test/java/com/alibaba/rocketmq/common/filter/PolishExprTest.java
deleted file mode 100644
index 612df69..0000000
--- a/common/src/test/java/com/alibaba/rocketmq/common/filter/PolishExprTest.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package com.alibaba.rocketmq.common.filter;
-
-import com.alibaba.rocketmq.common.filter.impl.Op;
-import com.alibaba.rocketmq.common.filter.impl.PolishExpr;
-import junit.framework.Assert;
-import org.junit.Test;
-
-import java.util.List;
-
-
-/**
- * @author lansheng.zj
- */
-public class PolishExprTest {
-
-    private String expression = "tag1||(tag2&&tag3)&&tag4||tag5&&(tag6 && tag7)|| tag8    && tag9";
-    private PolishExpr polishExpr;
-
-
-    public void init() {
-        polishExpr = new PolishExpr();
-    }
-
-
-    @Test
-    public void testReversePolish() {
-        List<Op> antiPolishExpression = polishExpr.reversePolish(expression);
-        System.out.println(antiPolishExpression);
-    }
-
-
-    @Test
-    public void testReversePolish_Performance() {
-        // prepare
-        for (int i = 0; i < 100000; i++) {
-            polishExpr.reversePolish(expression);
-        }
-
-        long start = System.currentTimeMillis();
-        for (int i = 0; i < 100000; i++) {
-            polishExpr.reversePolish(expression);
-        }
-        long cost = System.currentTimeMillis() - start;
-        System.out.println(cost);
-        // System.out.println(cost / 100000F);
-
-        Assert.assertTrue(cost < 500);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/test/java/com/alibaba/rocketmq/common/protocol/ConsumeStatusTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/com/alibaba/rocketmq/common/protocol/ConsumeStatusTest.java b/common/src/test/java/com/alibaba/rocketmq/common/protocol/ConsumeStatusTest.java
deleted file mode 100644
index 32e3d98..0000000
--- a/common/src/test/java/com/alibaba/rocketmq/common/protocol/ConsumeStatusTest.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package com.alibaba.rocketmq.common.protocol;
-
-import com.alibaba.rocketmq.common.protocol.body.ConsumeStatus;
-import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable;
-import org.junit.Test;
-
-
-public class ConsumeStatusTest {
-
-    @Test
-    public void decode_test() throws Exception {
-        ConsumeStatus cs = new ConsumeStatus();
-        cs.setConsumeFailedTPS(0L);
-        String json = RemotingSerializable.toJson(cs, true);
-        System.out.println(json);
-        ConsumeStatus fromJson = RemotingSerializable.fromJson(json, ConsumeStatus.class);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/test/java/com/alibaba/rocketmq/common/protocol/MQProtosHelperTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/com/alibaba/rocketmq/common/protocol/MQProtosHelperTest.java b/common/src/test/java/com/alibaba/rocketmq/common/protocol/MQProtosHelperTest.java
deleted file mode 100644
index 749e7df..0000000
--- a/common/src/test/java/com/alibaba/rocketmq/common/protocol/MQProtosHelperTest.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package com.alibaba.rocketmq.common.protocol;
-
-/**
- * @author shijia.wxr
- */
-public class MQProtosHelperTest {
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/test/java/org/apache/rocketmq/common/MixAllTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/rocketmq/common/MixAllTest.java b/common/src/test/java/org/apache/rocketmq/common/MixAllTest.java
new file mode 100644
index 0000000..6e7c17b
--- /dev/null
+++ b/common/src/test/java/org/apache/rocketmq/common/MixAllTest.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.common;
+
+import junit.framework.Assert;
+import org.junit.Test;
+
+import java.net.InetAddress;
+import java.util.List;
+
+
+/**
+ * @author lansheng.zj
+ */
+public class MixAllTest {
+
+    @Test
+    public void test() throws Exception {
+        List<String> localInetAddress = MixAll.getLocalInetAddress();
+        String local = InetAddress.getLocalHost().getHostAddress();
+        Assert.assertTrue(localInetAddress.contains("127.0.0.1"));
+        Assert.assertTrue(localInetAddress.contains(local));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/test/java/org/apache/rocketmq/common/RemotingUtilTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/rocketmq/common/RemotingUtilTest.java b/common/src/test/java/org/apache/rocketmq/common/RemotingUtilTest.java
new file mode 100644
index 0000000..b7509b1
--- /dev/null
+++ b/common/src/test/java/org/apache/rocketmq/common/RemotingUtilTest.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.common;
+
+import org.apache.rocketmq.remoting.common.RemotingUtil;
+import org.junit.Test;
+
+
+public class RemotingUtilTest {
+    @Test
+    public void test() throws Exception {
+        String a = RemotingUtil.getLocalAddress();
+        System.out.println(a);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/test/java/org/apache/rocketmq/common/UtilAllTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/rocketmq/common/UtilAllTest.java b/common/src/test/java/org/apache/rocketmq/common/UtilAllTest.java
new file mode 100644
index 0000000..decd3d0
--- /dev/null
+++ b/common/src/test/java/org/apache/rocketmq/common/UtilAllTest.java
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.common;
+
+import org.junit.Test;
+
+import java.net.URL;
+import java.util.Properties;
+
+import static org.junit.Assert.assertTrue;
+
+
+public class UtilAllTest {
+
+    @Test
+    public void test_currentStackTrace() {
+        System.out.println(UtilAll.currentStackTrace());
+    }
+
+
+    @Test
+    public void test_a() {
+        URL url = this.getClass().getProtectionDomain().getCodeSource().getLocation();
+        System.out.println(url);
+        System.out.println(url.getPath());
+    }
+
+
+    @Test
+    public void test_resetClassProperties() {
+        DemoConfig demoConfig = new DemoConfig();
+        MixAll.properties2Object(new Properties(), demoConfig);
+    }
+
+
+    @Test
+    public void test_properties2String() {
+        DemoConfig demoConfig = new DemoConfig();
+        Properties properties = MixAll.object2Properties(demoConfig);
+        System.out.println(MixAll.properties2String(properties));
+    }
+
+
+    @Test
+    public void test_timeMillisToHumanString() {
+        System.out.println(UtilAll.timeMillisToHumanString());
+    }
+
+
+    @Test
+    public void test_isPropertiesEqual() {
+        final Properties p1 = new Properties();
+        final Properties p2 = new Properties();
+
+        p1.setProperty("a", "1");
+        p1.setProperty("b", "2");
+
+        p2.setProperty("a", "1");
+        p2.setProperty("b", "2");
+        // p2.setProperty("c", "3");
+
+        assertTrue(MixAll.isPropertiesEqual(p1, p2));
+    }
+
+
+    @Test
+    public void test_getpid() {
+        int pid = UtilAll.getPid();
+
+        System.out.println("PID = " + pid);
+        assertTrue(pid > 0);
+    }
+
+
+    @Test
+    public void test_isBlank() {
+        {
+            boolean result = UtilAll.isBlank("Hello ");
+            assertTrue(!result);
+        }
+
+        {
+            boolean result = UtilAll.isBlank(" Hello");
+            assertTrue(!result);
+        }
+
+        {
+            boolean result = UtilAll.isBlank("He llo");
+            assertTrue(!result);
+        }
+
+        {
+            boolean result = UtilAll.isBlank("  ");
+            assertTrue(result);
+        }
+
+        {
+            boolean result = UtilAll.isBlank("Hello");
+            assertTrue(!result);
+        }
+    }
+
+    static class DemoConfig {
+        private int demoWidth = 0;
+        private int demoLength = 0;
+        private boolean demoOK = false;
+        private String demoName = "haha";
+
+
+        public int getDemoWidth() {
+            return demoWidth;
+        }
+
+
+        public void setDemoWidth(int demoWidth) {
+            this.demoWidth = demoWidth;
+        }
+
+
+        public int getDemoLength() {
+            return demoLength;
+        }
+
+
+        public void setDemoLength(int demoLength) {
+            this.demoLength = demoLength;
+        }
+
+
+        public boolean isDemoOK() {
+            return demoOK;
+        }
+
+
+        public void setDemoOK(boolean demoOK) {
+            this.demoOK = demoOK;
+        }
+
+
+        public String getDemoName() {
+            return demoName;
+        }
+
+
+        public void setDemoNfieldame(String demoName) {
+            this.demoName = demoName;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/test/java/org/apache/rocketmq/common/filter/FilterAPITest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/rocketmq/common/filter/FilterAPITest.java b/common/src/test/java/org/apache/rocketmq/common/filter/FilterAPITest.java
new file mode 100644
index 0000000..b67c59a
--- /dev/null
+++ b/common/src/test/java/org/apache/rocketmq/common/filter/FilterAPITest.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.common.filter;
+
+import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+import org.junit.Test;
+
+
+/**
+ * @author shijia.wxr
+ *
+ */
+public class FilterAPITest {
+
+    @Test
+    public void testBuildSubscriptionData() throws Exception {
+        SubscriptionData subscriptionData =
+                FilterAPI.buildSubscriptionData("ConsumerGroup1", "TestTopic", "TAG1 || Tag2 || tag3");
+        System.out.println(subscriptionData);
+    }
+
+    @Test
+    public void testSubscriptionData() throws Exception {
+        SubscriptionData subscriptionData =
+                FilterAPI.buildSubscriptionData("ConsumerGroup1", "TestTopic", "TAG1 || Tag2 || tag3");
+        subscriptionData.setFilterClassSource("java hello");
+        String json = RemotingSerializable.toJson(subscriptionData, true);
+        System.out.println(json);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/test/java/org/apache/rocketmq/common/filter/PolishExprTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/rocketmq/common/filter/PolishExprTest.java b/common/src/test/java/org/apache/rocketmq/common/filter/PolishExprTest.java
new file mode 100644
index 0000000..f2ba2a3
--- /dev/null
+++ b/common/src/test/java/org/apache/rocketmq/common/filter/PolishExprTest.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.common.filter;
+
+import org.apache.rocketmq.common.filter.impl.Op;
+import org.apache.rocketmq.common.filter.impl.PolishExpr;
+import junit.framework.Assert;
+import org.junit.Test;
+
+import java.util.List;
+
+
+/**
+ * @author lansheng.zj
+ */
+public class PolishExprTest {
+
+    private String expression = "tag1||(tag2&&tag3)&&tag4||tag5&&(tag6 && tag7)|| tag8    && tag9";
+    private PolishExpr polishExpr;
+
+
+    public void init() {
+        polishExpr = new PolishExpr();
+    }
+
+
+    @Test
+    public void testReversePolish() {
+        List<Op> antiPolishExpression = polishExpr.reversePolish(expression);
+        System.out.println(antiPolishExpression);
+    }
+
+
+    @Test
+    public void testReversePolish_Performance() {
+        // prepare
+        for (int i = 0; i < 100000; i++) {
+            polishExpr.reversePolish(expression);
+        }
+
+        long start = System.currentTimeMillis();
+        for (int i = 0; i < 100000; i++) {
+            polishExpr.reversePolish(expression);
+        }
+        long cost = System.currentTimeMillis() - start;
+        System.out.println(cost);
+        // System.out.println(cost / 100000F);
+
+        Assert.assertTrue(cost < 500);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/test/java/org/apache/rocketmq/common/protocol/ConsumeStatusTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/rocketmq/common/protocol/ConsumeStatusTest.java b/common/src/test/java/org/apache/rocketmq/common/protocol/ConsumeStatusTest.java
new file mode 100644
index 0000000..79c6bbf
--- /dev/null
+++ b/common/src/test/java/org/apache/rocketmq/common/protocol/ConsumeStatusTest.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.common.protocol;
+
+import org.apache.rocketmq.common.protocol.body.ConsumeStatus;
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+import org.junit.Test;
+
+
+public class ConsumeStatusTest {
+
+    @Test
+    public void decode_test() throws Exception {
+        ConsumeStatus cs = new ConsumeStatus();
+        cs.setConsumeFailedTPS(0L);
+        String json = RemotingSerializable.toJson(cs, true);
+        System.out.println(json);
+        ConsumeStatus fromJson = RemotingSerializable.fromJson(json, ConsumeStatus.class);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/test/java/org/apache/rocketmq/common/protocol/MQProtosHelperTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/rocketmq/common/protocol/MQProtosHelperTest.java b/common/src/test/java/org/apache/rocketmq/common/protocol/MQProtosHelperTest.java
new file mode 100644
index 0000000..cd56627
--- /dev/null
+++ b/common/src/test/java/org/apache/rocketmq/common/protocol/MQProtosHelperTest.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.common.protocol;
+
+/**
+ * @author shijia.wxr
+ */
+public class MQProtosHelperTest {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/example/pom.xml
----------------------------------------------------------------------
diff --git a/example/pom.xml b/example/pom.xml
index 8e68a58..53aa6a6 100644
--- a/example/pom.xml
+++ b/example/pom.xml
@@ -18,7 +18,7 @@
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <parent>
-        <groupId>com.alibaba.rocketmq</groupId>
+        <groupId>org.apache.rocketmq</groupId>
         <artifactId>rocketmq-all</artifactId>
         <version>4.0.0-SNAPSHOT</version>
     </parent>