You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2016/12/19 09:40:41 UTC

[24/43] incubator-rocketmq git commit: Finish code dump. Reviewed by: @yukon @vongosling @stevenschew @vintagewang @lollipop @zander

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/UtilAll.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/UtilAll.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/UtilAll.java
new file mode 100644
index 0000000..4429e3d
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/UtilAll.java
@@ -0,0 +1,525 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.common;
+
+import com.alibaba.rocketmq.remoting.common.RemotingHelper;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.lang.management.RuntimeMXBean;
+import java.net.Inet4Address;
+import java.net.InetAddress;
+import java.net.NetworkInterface;
+import java.text.NumberFormat;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.*;
+import java.util.zip.CRC32;
+import java.util.zip.DeflaterOutputStream;
+import java.util.zip.InflaterInputStream;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class UtilAll {
+    public static final String YYYY_MM_DD_HH_MM_SS = "yyyy-MM-dd HH:mm:ss";
+    public static final String YYYY_MM_DD_HH_MM_SS_SSS = "yyyy-MM-dd#HH:mm:ss:SSS";
+    public static final String YYYY_MMDD_HHMMSS = "yyyyMMddHHmmss";
+
+
+    public static int getPid() {
+        RuntimeMXBean runtime = ManagementFactory.getRuntimeMXBean();
+        String name = runtime.getName(); // format: "pid@hostname"
+        try {
+            return Integer.parseInt(name.substring(0, name.indexOf('@')));
+        } catch (Exception e) {
+            return -1;
+        }
+    }
+
+    public static String currentStackTrace() {
+        StringBuilder sb = new StringBuilder();
+        StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
+        for (StackTraceElement ste : stackTrace) {
+            sb.append("\n\t");
+            sb.append(ste.toString());
+        }
+
+        return sb.toString();
+    }
+
+    public static String offset2FileName(final long offset) {
+        final NumberFormat nf = NumberFormat.getInstance();
+        nf.setMinimumIntegerDigits(20);
+        nf.setMaximumFractionDigits(0);
+        nf.setGroupingUsed(false);
+        return nf.format(offset);
+    }
+
+    public static long computeEclipseTimeMilliseconds(final long beginTime) {
+        return System.currentTimeMillis() - beginTime;
+    }
+
+
+    public static boolean isItTimeToDo(final String when) {
+        String[] whiles = when.split(";");
+        if (whiles != null && whiles.length > 0) {
+            Calendar now = Calendar.getInstance();
+            for (String w : whiles) {
+                int nowHour = Integer.parseInt(w);
+                if (nowHour == now.get(Calendar.HOUR_OF_DAY)) {
+                    return true;
+                }
+            }
+        }
+
+        return false;
+    }
+
+
+    public static String timeMillisToHumanString() {
+        return timeMillisToHumanString(System.currentTimeMillis());
+    }
+
+
+    public static String timeMillisToHumanString(final long t) {
+        Calendar cal = Calendar.getInstance();
+        cal.setTimeInMillis(t);
+        return String.format("%04d%02d%02d%02d%02d%02d%03d", cal.get(Calendar.YEAR), cal.get(Calendar.MONTH) + 1,
+                cal.get(Calendar.DAY_OF_MONTH), cal.get(Calendar.HOUR_OF_DAY), cal.get(Calendar.MINUTE), cal.get(Calendar.SECOND),
+                cal.get(Calendar.MILLISECOND));
+    }
+
+
+    public static long computNextMorningTimeMillis() {
+        Calendar cal = Calendar.getInstance();
+        cal.setTimeInMillis(System.currentTimeMillis());
+        cal.add(Calendar.DAY_OF_MONTH, 1);
+        cal.set(Calendar.HOUR_OF_DAY, 0);
+        cal.set(Calendar.MINUTE, 0);
+        cal.set(Calendar.SECOND, 0);
+        cal.set(Calendar.MILLISECOND, 0);
+
+        return cal.getTimeInMillis();
+    }
+
+
+    public static long computNextMinutesTimeMillis() {
+        Calendar cal = Calendar.getInstance();
+        cal.setTimeInMillis(System.currentTimeMillis());
+        cal.add(Calendar.DAY_OF_MONTH, 0);
+        cal.add(Calendar.HOUR_OF_DAY, 0);
+        cal.add(Calendar.MINUTE, 1);
+        cal.set(Calendar.SECOND, 0);
+        cal.set(Calendar.MILLISECOND, 0);
+
+        return cal.getTimeInMillis();
+    }
+
+
+    public static long computNextHourTimeMillis() {
+        Calendar cal = Calendar.getInstance();
+        cal.setTimeInMillis(System.currentTimeMillis());
+        cal.add(Calendar.DAY_OF_MONTH, 0);
+        cal.add(Calendar.HOUR_OF_DAY, 1);
+        cal.set(Calendar.MINUTE, 0);
+        cal.set(Calendar.SECOND, 0);
+        cal.set(Calendar.MILLISECOND, 0);
+
+        return cal.getTimeInMillis();
+    }
+
+
+    public static long computNextHalfHourTimeMillis() {
+        Calendar cal = Calendar.getInstance();
+        cal.setTimeInMillis(System.currentTimeMillis());
+        cal.add(Calendar.DAY_OF_MONTH, 0);
+        cal.add(Calendar.HOUR_OF_DAY, 1);
+        cal.set(Calendar.MINUTE, 30);
+        cal.set(Calendar.SECOND, 0);
+        cal.set(Calendar.MILLISECOND, 0);
+
+        return cal.getTimeInMillis();
+    }
+
+
+    public static String timeMillisToHumanString2(final long t) {
+        Calendar cal = Calendar.getInstance();
+        cal.setTimeInMillis(t);
+        return String.format("%04d-%02d-%02d %02d:%02d:%02d,%03d",
+                cal.get(Calendar.YEAR),
+                cal.get(Calendar.MONTH) + 1,
+                cal.get(Calendar.DAY_OF_MONTH),
+                cal.get(Calendar.HOUR_OF_DAY),
+                cal.get(Calendar.MINUTE),
+                cal.get(Calendar.SECOND),
+                cal.get(Calendar.MILLISECOND));
+    }
+
+
+    public static String timeMillisToHumanString3(final long t) {
+        Calendar cal = Calendar.getInstance();
+        cal.setTimeInMillis(t);
+        return String.format("%04d%02d%02d%02d%02d%02d",
+                cal.get(Calendar.YEAR),
+                cal.get(Calendar.MONTH) + 1,
+                cal.get(Calendar.DAY_OF_MONTH),
+                cal.get(Calendar.HOUR_OF_DAY),
+                cal.get(Calendar.MINUTE),
+                cal.get(Calendar.SECOND));
+    }
+
+
+    public static double getDiskPartitionSpaceUsedPercent(final String path) {
+        if (null == path || path.isEmpty())
+            return -1;
+
+        try {
+            File file = new File(path);
+            if (!file.exists()) {
+                boolean result = file.mkdirs();
+                if (!result) {
+                }
+            }
+
+            long totalSpace = file.getTotalSpace();
+            long freeSpace = file.getFreeSpace();
+            long usedSpace = totalSpace - freeSpace;
+            if (totalSpace > 0) {
+                return usedSpace / (double) totalSpace;
+            }
+        } catch (Exception e) {
+            return -1;
+        }
+
+        return -1;
+    }
+
+
+    public static final int crc32(byte[] array) {
+        if (array != null) {
+            return crc32(array, 0, array.length);
+        }
+
+        return 0;
+    }
+
+
+    public static final int crc32(byte[] array, int offset, int length) {
+        CRC32 crc32 = new CRC32();
+        crc32.update(array, offset, length);
+        return (int) (crc32.getValue() & 0x7FFFFFFF);
+    }
+
+    final static char[] HEX_ARRAY = "0123456789ABCDEF".toCharArray();
+
+    public static String bytes2string(byte[] src) {
+        char[] hexChars = new char[src.length * 2];
+        for (int j = 0; j < src.length; j++) {
+            int v = src[j] & 0xFF;
+            hexChars[j * 2] = HEX_ARRAY[v >>> 4];
+            hexChars[j * 2 + 1] = HEX_ARRAY[v & 0x0F];
+        }
+        return new String(hexChars);
+    }
+
+    public static byte[] string2bytes(String hexString) {
+        if (hexString == null || hexString.equals("")) {
+            return null;
+        }
+        hexString = hexString.toUpperCase();
+        int length = hexString.length() / 2;
+        char[] hexChars = hexString.toCharArray();
+        byte[] d = new byte[length];
+        for (int i = 0; i < length; i++) {
+            int pos = i * 2;
+            d[i] = (byte) (charToByte(hexChars[pos]) << 4 | charToByte(hexChars[pos + 1]));
+        }
+        return d;
+    }
+
+
+    private static byte charToByte(char c) {
+        return (byte) "0123456789ABCDEF".indexOf(c);
+    }
+
+
+    public static byte[] uncompress(final byte[] src) throws IOException {
+        byte[] result = src;
+        byte[] uncompressData = new byte[src.length];
+        ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(src);
+        InflaterInputStream inflaterInputStream = new InflaterInputStream(byteArrayInputStream);
+        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(src.length);
+
+        try {
+            while (true) {
+                int len = inflaterInputStream.read(uncompressData, 0, uncompressData.length);
+                if (len <= 0) {
+                    break;
+                }
+                byteArrayOutputStream.write(uncompressData, 0, len);
+            }
+            byteArrayOutputStream.flush();
+            result = byteArrayOutputStream.toByteArray();
+        } catch (IOException e) {
+            throw e;
+        } finally {
+            try {
+                byteArrayInputStream.close();
+            } catch (IOException e) {
+            }
+            try {
+                inflaterInputStream.close();
+            } catch (IOException e) {
+            }
+            try {
+                byteArrayOutputStream.close();
+            } catch (IOException e) {
+            }
+        }
+
+        return result;
+    }
+
+
+    public static byte[] compress(final byte[] src, final int level) throws IOException {
+        byte[] result = src;
+        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(src.length);
+        java.util.zip.Deflater defeater = new java.util.zip.Deflater(level);
+        DeflaterOutputStream deflaterOutputStream = new DeflaterOutputStream(byteArrayOutputStream, defeater);
+        try {
+            deflaterOutputStream.write(src);
+            deflaterOutputStream.finish();
+            deflaterOutputStream.close();
+            result = byteArrayOutputStream.toByteArray();
+        } catch (IOException e) {
+            defeater.end();
+            throw e;
+        } finally {
+            try {
+                byteArrayOutputStream.close();
+            } catch (IOException ignored) {
+            }
+
+            defeater.end();
+        }
+
+        return result;
+    }
+
+
+    public static int asInt(String str, int defaultValue) {
+        try {
+            return Integer.parseInt(str);
+        } catch (Exception e) {
+            return defaultValue;
+        }
+    }
+
+
+    public static long asLong(String str, long defaultValue) {
+        try {
+            return Long.parseLong(str);
+        } catch (Exception e) {
+            return defaultValue;
+        }
+    }
+
+
+    public static String formatDate(Date date, String pattern) {
+        SimpleDateFormat df = new SimpleDateFormat(pattern);
+        return df.format(date);
+    }
+
+
+    public static Date parseDate(String date, String pattern) {
+        SimpleDateFormat df = new SimpleDateFormat(pattern);
+        try {
+            return df.parse(date);
+        } catch (ParseException e) {
+            return null;
+        }
+    }
+
+
+    public static String responseCode2String(final int code) {
+        return Integer.toString(code);
+    }
+
+
+    public static String frontStringAtLeast(final String str, final int size) {
+        if (str != null) {
+            if (str.length() > size) {
+                return str.substring(0, size);
+            }
+        }
+
+        return str;
+    }
+
+
+    public static boolean isBlank(String str) {
+        int strLen;
+        if (str == null || (strLen = str.length()) == 0) {
+            return true;
+        }
+        for (int i = 0; i < strLen; i++) {
+            if (!Character.isWhitespace(str.charAt(i))) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+
+    public static String jstack() {
+        return jstack(Thread.getAllStackTraces());
+    }
+
+
+    public static String jstack(Map<Thread, StackTraceElement[]> map) {
+        StringBuilder result = new StringBuilder();
+        try {
+            Iterator<Map.Entry<Thread, StackTraceElement[]>> ite = map.entrySet().iterator();
+            while (ite.hasNext()) {
+                Map.Entry<Thread, StackTraceElement[]> entry = ite.next();
+                StackTraceElement[] elements = entry.getValue();
+                Thread thread = entry.getKey();
+                if (elements != null && elements.length > 0) {
+                    String threadName = entry.getKey().getName();
+                    result.append(String.format("%-40sTID: %d STATE: %s%n", threadName, thread.getId(), thread.getState()));
+                    for (StackTraceElement el : elements) {
+                        result.append(String.format("%-40s%s%n", threadName, el.toString()));
+                    }
+                    result.append("\n");
+                }
+            }
+        } catch (Throwable e) {
+            result.append(RemotingHelper.exceptionSimpleDesc(e));
+        }
+
+        return result.toString();
+    }
+
+    public static boolean isInternalIP(byte[] ip) {
+        if (ip.length != 4) {
+            throw new RuntimeException("illegal ipv4 bytes");
+        }
+
+
+        //10.0.0.0~10.255.255.255
+        //172.16.0.0~172.31.255.255
+        //192.168.0.0~192.168.255.255
+        if (ip[0] == (byte) 10) {
+
+            return true;
+        } else if (ip[0] == (byte) 172) {
+            if (ip[1] >= (byte) 16 && ip[1] <= (byte) 31) {
+                return true;
+            }
+        } else if (ip[0] == (byte) 192) {
+            if (ip[1] == (byte) 168) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    private static boolean ipCheck(byte[] ip) {
+        if (ip.length != 4) {
+            throw new RuntimeException("illegal ipv4 bytes");
+        }
+
+//        if (ip[0] == (byte)30 && ip[1] == (byte)10 && ip[2] == (byte)163 && ip[3] == (byte)120) {
+//        }
+
+
+        if (ip[0] >= (byte) 1 && ip[0] <= (byte) 126) {
+            if (ip[1] == (byte) 1 && ip[2] == (byte) 1 && ip[3] == (byte) 1) {
+                return false;
+            }
+            if (ip[1] == (byte) 0 && ip[2] == (byte) 0 && ip[3] == (byte) 0) {
+                return false;
+            }
+            return true;
+        } else if (ip[0] >= (byte) 128 && ip[0] <= (byte) 191) {
+            if (ip[2] == (byte) 1 && ip[3] == (byte) 1) {
+                return false;
+            }
+            if (ip[2] == (byte) 0 && ip[3] == (byte) 0) {
+                return false;
+            }
+            return true;
+        } else if (ip[0] >= (byte) 192 && ip[0] <= (byte) 223) {
+            if (ip[3] == (byte) 1) {
+                return false;
+            }
+            if (ip[3] == (byte) 0) {
+                return false;
+            }
+            return true;
+        }
+        return false;
+    }
+
+    public static String ipToIPv4Str(byte[] ip) {
+        if (ip.length != 4) {
+            return null;
+        }
+        return new StringBuilder().append(ip[0] & 0xFF).append(".").append(
+                ip[1] & 0xFF).append(".").append(ip[2] & 0xFF)
+                .append(".").append(ip[3] & 0xFF).toString();
+    }
+
+    public static byte[] getIP() {
+        try {
+            Enumeration allNetInterfaces = NetworkInterface.getNetworkInterfaces();
+            InetAddress ip = null;
+            byte[] internalIP = null;
+            while (allNetInterfaces.hasMoreElements()) {
+                NetworkInterface netInterface = (NetworkInterface) allNetInterfaces.nextElement();
+                Enumeration addresses = netInterface.getInetAddresses();
+                while (addresses.hasMoreElements()) {
+                    ip = (InetAddress) addresses.nextElement();
+                    if (ip != null && ip instanceof Inet4Address) {
+                        byte[] ipByte = ip.getAddress();
+                        if (ipByte.length == 4) {
+                            if (ipCheck(ipByte)) {
+                                if (!isInternalIP(ipByte)) {
+                                    return ipByte;
+                                } else if (internalIP == null) {
+                                    internalIP = ipByte;
+                                }
+                            }
+                        }
+                    }
+                }
+            }
+            if (internalIP != null) {
+                return internalIP;
+            } else {
+                throw new RuntimeException("Can not get local ip");
+            }
+        } catch (Exception e) {
+            throw new RuntimeException("Can not get local ip", e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/admin/ConsumeStats.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/admin/ConsumeStats.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/admin/ConsumeStats.java
new file mode 100644
index 0000000..d8c9311
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/admin/ConsumeStats.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.common.admin;
+
+import com.alibaba.rocketmq.common.message.MessageQueue;
+import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map.Entry;
+
+
+/**
+ *
+ * @author shijia.wxr
+ *
+ */
+public class ConsumeStats extends RemotingSerializable {
+    private HashMap<MessageQueue, OffsetWrapper> offsetTable = new HashMap<MessageQueue, OffsetWrapper>();
+    private double consumeTps = 0;
+
+
+    public long computeTotalDiff() {
+        long diffTotal = 0L;
+
+        Iterator<Entry<MessageQueue, OffsetWrapper>> it = this.offsetTable.entrySet().iterator();
+        while (it.hasNext()) {
+            Entry<MessageQueue, OffsetWrapper> next = it.next();
+            long diff = next.getValue().getBrokerOffset() - next.getValue().getConsumerOffset();
+            diffTotal += diff;
+        }
+
+        return diffTotal;
+    }
+
+
+    public HashMap<MessageQueue, OffsetWrapper> getOffsetTable() {
+        return offsetTable;
+    }
+
+
+    public void setOffsetTable(HashMap<MessageQueue, OffsetWrapper> offsetTable) {
+        this.offsetTable = offsetTable;
+    }
+
+    public double getConsumeTps() {
+        return consumeTps;
+    }
+
+    public void setConsumeTps(double consumeTps) {
+        this.consumeTps = consumeTps;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/admin/OffsetWrapper.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/admin/OffsetWrapper.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/admin/OffsetWrapper.java
new file mode 100644
index 0000000..07785c2
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/admin/OffsetWrapper.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.common.admin;
+
+/**
+ *
+ * @author shijia.wxr
+ *
+ */
+public class OffsetWrapper {
+    private long brokerOffset;
+    private long consumerOffset;
+
+    private long lastTimestamp;
+
+
+    public long getBrokerOffset() {
+        return brokerOffset;
+    }
+
+
+    public void setBrokerOffset(long brokerOffset) {
+        this.brokerOffset = brokerOffset;
+    }
+
+
+    public long getConsumerOffset() {
+        return consumerOffset;
+    }
+
+
+    public void setConsumerOffset(long consumerOffset) {
+        this.consumerOffset = consumerOffset;
+    }
+
+
+    public long getLastTimestamp() {
+        return lastTimestamp;
+    }
+
+
+    public void setLastTimestamp(long lastTimestamp) {
+        this.lastTimestamp = lastTimestamp;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/admin/RollbackStats.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/admin/RollbackStats.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/admin/RollbackStats.java
new file mode 100644
index 0000000..03d94a2
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/admin/RollbackStats.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package com.alibaba.rocketmq.common.admin;
+
+/**
+ *
+ * @author manhong.yqd
+ */
+public class RollbackStats {
+    private String brokerName;
+    private long queueId;
+    private long brokerOffset;
+    private long consumerOffset;
+    private long timestampOffset;
+    private long rollbackOffset;
+
+
+    public String getBrokerName() {
+        return brokerName;
+    }
+
+
+    public void setBrokerName(String brokerName) {
+        this.brokerName = brokerName;
+    }
+
+
+    public long getQueueId() {
+        return queueId;
+    }
+
+
+    public void setQueueId(long queueId) {
+        this.queueId = queueId;
+    }
+
+
+    public long getBrokerOffset() {
+        return brokerOffset;
+    }
+
+
+    public void setBrokerOffset(long brokerOffset) {
+        this.brokerOffset = brokerOffset;
+    }
+
+
+    public long getConsumerOffset() {
+        return consumerOffset;
+    }
+
+
+    public void setConsumerOffset(long consumerOffset) {
+        this.consumerOffset = consumerOffset;
+    }
+
+
+    public long getTimestampOffset() {
+        return timestampOffset;
+    }
+
+
+    public void setTimestampOffset(long timestampOffset) {
+        this.timestampOffset = timestampOffset;
+    }
+
+
+    public long getRollbackOffset() {
+        return rollbackOffset;
+    }
+
+
+    public void setRollbackOffset(long rollbackOffset) {
+        this.rollbackOffset = rollbackOffset;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/admin/TopicOffset.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/admin/TopicOffset.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/admin/TopicOffset.java
new file mode 100644
index 0000000..076d6eb
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/admin/TopicOffset.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.common.admin;
+
+/**
+ *
+ * @author shijia.wxr
+ *
+ */
+public class TopicOffset {
+    private long minOffset;
+    private long maxOffset;
+    private long lastUpdateTimestamp;
+
+
+    public long getMinOffset() {
+        return minOffset;
+    }
+
+
+    public void setMinOffset(long minOffset) {
+        this.minOffset = minOffset;
+    }
+
+
+    public long getMaxOffset() {
+        return maxOffset;
+    }
+
+
+    public void setMaxOffset(long maxOffset) {
+        this.maxOffset = maxOffset;
+    }
+
+
+    public long getLastUpdateTimestamp() {
+        return lastUpdateTimestamp;
+    }
+
+
+    public void setLastUpdateTimestamp(long lastUpdateTimestamp) {
+        this.lastUpdateTimestamp = lastUpdateTimestamp;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/admin/TopicStatsTable.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/admin/TopicStatsTable.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/admin/TopicStatsTable.java
new file mode 100644
index 0000000..12d1d4b
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/admin/TopicStatsTable.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.common.admin;
+
+import com.alibaba.rocketmq.common.message.MessageQueue;
+import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable;
+
+import java.util.HashMap;
+
+
+/**
+ *
+ * @author shijia.wxr
+ *
+ */
+public class TopicStatsTable extends RemotingSerializable {
+    private HashMap<MessageQueue, TopicOffset> offsetTable = new HashMap<MessageQueue, TopicOffset>();
+
+
+    public HashMap<MessageQueue, TopicOffset> getOffsetTable() {
+        return offsetTable;
+    }
+
+
+    public void setOffsetTable(HashMap<MessageQueue, TopicOffset> offsetTable) {
+        this.offsetTable = offsetTable;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/annotation/ImportantField.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/annotation/ImportantField.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/annotation/ImportantField.java
new file mode 100644
index 0000000..fe0cb12
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/annotation/ImportantField.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.common.annotation;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@Documented
+@Retention(RetentionPolicy.RUNTIME)
+@Target({ElementType.FIELD, ElementType.METHOD, ElementType.PARAMETER, ElementType.LOCAL_VARIABLE})
+public @interface ImportantField {
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/constant/DBMsgConstants.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/constant/DBMsgConstants.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/constant/DBMsgConstants.java
new file mode 100644
index 0000000..54bc04d
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/constant/DBMsgConstants.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package com.alibaba.rocketmq.common.constant;
+
+public class DBMsgConstants {
+    public static final int MAX_BODY_SIZE = 64 * 1024 * 1204; //64KB
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/constant/LoggerName.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/constant/LoggerName.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/constant/LoggerName.java
new file mode 100644
index 0000000..9175669
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/constant/LoggerName.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.common.constant;
+
+/**
+ * @author shijia.wxr
+ */
+public class LoggerName {
+    public static final String FILTERSRV_LOGGER_NAME = "RocketmqFiltersrv";
+    public static final String NAMESRV_LOGGER_NAME = "RocketmqNamesrv";
+    public static final String BROKER_LOGGER_NAME = "RocketmqBroker";
+    public static final String CLIENT_LOGGER_NAME = "RocketmqClient";
+    public static final String TOOLS_LOGGER_NAME = "RocketmqTools";
+    public static final String COMMON_LOGGER_NAME = "RocketmqCommon";
+    public static final String STORE_LOGGER_NAME = "RocketmqStore";
+    public static final String STORE_ERROR_LOGGER_NAME = "RocketmqStoreError";
+    public static final String TRANSACTION_LOGGER_NAME = "RocketmqTransaction";
+    public static final String REBALANCE_LOCK_LOGGER_NAME = "RocketmqRebalanceLock";
+    public static final String ROCKETMQ_STATS_LOGGER_NAME = "RocketmqStats";
+    public static final String COMMERCIAL_LOGGER_NAME = "RocketmqCommercial";
+    public static final String FLOW_CONTROL_LOGGER_NAME = "RocketmqFlowControl";
+    public static final String ROCKETMQ_AUTHORIZE_LOGGER_NAME = "RocketmqAuthorize";
+    public static final String DUPLICATION_LOGGER_NAME = "RocketmqDuplication";
+    public static final String PROTECTION_LOGGER_NAME = "RocketmqProtection";
+    public static final String WATER_MARK_LOGGER_NAME = "RocketmqWaterMark";
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/constant/PermName.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/constant/PermName.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/constant/PermName.java
new file mode 100644
index 0000000..95c2510
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/constant/PermName.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.common.constant;
+
+/**
+ * @author shijia.wxr
+ */
+public class PermName {
+    public static final int PERM_PRIORITY = 0x1 << 3;
+    public static final int PERM_READ = 0x1 << 2;
+    public static final int PERM_WRITE = 0x1 << 1;
+    public static final int PERM_INHERIT = 0x1 << 0;
+
+    public static String perm2String(final int perm) {
+        final StringBuffer sb = new StringBuffer("---");
+        if (isReadable(perm)) {
+            sb.replace(0, 1, "R");
+        }
+
+        if (isWriteable(perm)) {
+            sb.replace(1, 2, "W");
+        }
+
+        if (isInherited(perm)) {
+            sb.replace(2, 3, "X");
+        }
+
+        return sb.toString();
+    }
+
+    public static boolean isReadable(final int perm) {
+        return (perm & PERM_READ) == PERM_READ;
+    }
+
+    public static boolean isWriteable(final int perm) {
+        return (perm & PERM_WRITE) == PERM_WRITE;
+    }
+
+    public static boolean isInherited(final int perm) {
+        return (perm & PERM_INHERIT) == PERM_INHERIT;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/consumer/ConsumeFromWhere.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/consumer/ConsumeFromWhere.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/consumer/ConsumeFromWhere.java
new file mode 100644
index 0000000..ededc90
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/consumer/ConsumeFromWhere.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.common.consumer;
+
+/**
+ *
+ * @author shijia.wxr
+ */
+public enum ConsumeFromWhere {
+    CONSUME_FROM_LAST_OFFSET,
+
+    @Deprecated
+    CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST,
+    @Deprecated
+    CONSUME_FROM_MIN_OFFSET,
+    @Deprecated
+    CONSUME_FROM_MAX_OFFSET,
+    CONSUME_FROM_FIRST_OFFSET,
+    CONSUME_FROM_TIMESTAMP,
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/filter/FilterAPI.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/filter/FilterAPI.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/filter/FilterAPI.java
new file mode 100644
index 0000000..2b26b83
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/filter/FilterAPI.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.common.filter;
+
+import com.alibaba.rocketmq.common.protocol.heartbeat.SubscriptionData;
+
+import java.net.URL;
+
+
+/**
+ * @author shijia.wxr
+ *
+ */
+public class FilterAPI {
+    public static URL classFile(final String className) {
+        final String javaSource = simpleClassName(className) + ".java";
+        URL url = FilterAPI.class.getClassLoader().getResource(javaSource);
+        return url;
+    }
+
+    public static String simpleClassName(final String className) {
+        String simple = className;
+        int index = className.lastIndexOf(".");
+        if (index >= 0) {
+            simple = className.substring(index + 1);
+        }
+
+        return simple;
+    }
+
+    public static SubscriptionData buildSubscriptionData(final String consumerGroup, String topic,
+                                                         String subString) throws Exception {
+        SubscriptionData subscriptionData = new SubscriptionData();
+        subscriptionData.setTopic(topic);
+        subscriptionData.setSubString(subString);
+
+        if (null == subString || subString.equals(SubscriptionData.SUB_ALL) || subString.length() == 0) {
+            subscriptionData.setSubString(SubscriptionData.SUB_ALL);
+        } else {
+            String[] tags = subString.split("\\|\\|");
+            if (tags != null && tags.length > 0) {
+                for (String tag : tags) {
+                    if (tag.length() > 0) {
+                        String trimString = tag.trim();
+                        if (trimString.length() > 0) {
+                            subscriptionData.getTagsSet().add(trimString);
+                            subscriptionData.getCodeSet().add(trimString.hashCode());
+                        }
+                    }
+                }
+            } else {
+                throw new Exception("subString split error");
+            }
+        }
+
+        return subscriptionData;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/filter/FilterContext.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/filter/FilterContext.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/filter/FilterContext.java
new file mode 100644
index 0000000..50cc5fc
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/filter/FilterContext.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package com.alibaba.rocketmq.common.filter;
+
+public class FilterContext {
+    private String consumerGroup;
+
+
+    public String getConsumerGroup() {
+        return consumerGroup;
+    }
+
+
+    public void setConsumerGroup(String consumerGroup) {
+        this.consumerGroup = consumerGroup;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/filter/MessageFilter.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/filter/MessageFilter.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/filter/MessageFilter.java
new file mode 100644
index 0000000..8a1252e
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/filter/MessageFilter.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.rocketmq.common.filter;
+
+import com.alibaba.rocketmq.common.message.MessageExt;
+
+
+public interface MessageFilter {
+    boolean match(final MessageExt msg, final FilterContext context);
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/filter/impl/Op.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/filter/impl/Op.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/filter/impl/Op.java
new file mode 100644
index 0000000..f83a5f5
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/filter/impl/Op.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package com.alibaba.rocketmq.common.filter.impl;
+
+public abstract class Op {
+
+    private String symbol;
+
+
+    protected Op(String symbol) {
+        this.symbol = symbol;
+    }
+
+
+    public String getSymbol() {
+        return symbol;
+    }
+
+
+    public String toString() {
+        return symbol;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/filter/impl/Operand.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/filter/impl/Operand.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/filter/impl/Operand.java
new file mode 100644
index 0000000..95ca663
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/filter/impl/Operand.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package com.alibaba.rocketmq.common.filter.impl;
+
+public class Operand extends Op {
+
+    public Operand(String symbol) {
+        super(symbol);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/filter/impl/Operator.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/filter/impl/Operator.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/filter/impl/Operator.java
new file mode 100644
index 0000000..c906d72
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/filter/impl/Operator.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package com.alibaba.rocketmq.common.filter.impl;
+
+public class Operator extends Op {
+
+    public static final Operator LEFTPARENTHESIS = new Operator("(", 30, false);
+    public static final Operator RIGHTPARENTHESIS = new Operator(")", 30, false);
+    public static final Operator AND = new Operator("&&", 20, true);
+    public static final Operator OR = new Operator("||", 15, true);
+
+    private int priority;
+    private boolean compareable;
+
+
+    private Operator(String symbol, int priority, boolean compareable) {
+        super(symbol);
+        this.priority = priority;
+        this.compareable = compareable;
+    }
+
+    public static Operator createOperator(String operator) {
+        if (LEFTPARENTHESIS.getSymbol().equals(operator))
+            return LEFTPARENTHESIS;
+        else if (RIGHTPARENTHESIS.getSymbol().equals(operator))
+            return RIGHTPARENTHESIS;
+        else if (AND.getSymbol().equals(operator))
+            return AND;
+        else if (OR.getSymbol().equals(operator))
+            return OR;
+        else
+            throw new IllegalArgumentException("unsupport operator " + operator);
+    }
+
+    public int getPriority() {
+        return priority;
+    }
+
+    public boolean isCompareable() {
+        return compareable;
+    }
+
+
+    public int compare(Operator operator) {
+        if (this.priority > operator.priority)
+            return 1;
+        else if (this.priority == operator.priority)
+            return 0;
+        else
+            return -1;
+    }
+
+    public boolean isSpecifiedOp(String operator) {
+        return this.getSymbol().equals(operator);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/filter/impl/PolishExpr.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/filter/impl/PolishExpr.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/filter/impl/PolishExpr.java
new file mode 100644
index 0000000..518c45e
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/filter/impl/PolishExpr.java
@@ -0,0 +1,189 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package com.alibaba.rocketmq.common.filter.impl;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Stack;
+
+import static com.alibaba.rocketmq.common.filter.impl.Operator.LEFTPARENTHESIS;
+import static com.alibaba.rocketmq.common.filter.impl.Operator.RIGHTPARENTHESIS;
+import static com.alibaba.rocketmq.common.filter.impl.Operator.createOperator;
+
+public class PolishExpr {
+
+    public static List<Op> reversePolish(String expression) {
+        return reversePolish(participle(expression));
+    }
+
+    /**
+     * Shunting-yard algorithm <br/>
+     * http://en.wikipedia.org/wiki/Shunting_yard_algorithm
+     *
+     * @param tokens
+     * @return the compute result of Shunting-yard algorithm
+     */
+    public static List<Op> reversePolish(List<Op> tokens) {
+        List<Op> segments = new ArrayList<Op>();
+        Stack<Operator> operatorStack = new Stack<Operator>();
+
+        for (int i = 0; i < tokens.size(); i++) {
+            Op token = tokens.get(i);
+            if (isOperand(token)) {
+
+                segments.add(token);
+            } else if (isLeftParenthesis(token)) {
+
+                operatorStack.push((Operator) token);
+            } else if (isRightParenthesis(token)) {
+
+                Operator opNew = null;
+                while (!operatorStack.empty() && LEFTPARENTHESIS != (opNew = operatorStack.pop())) {
+                    segments.add(opNew);
+                }
+                if (null == opNew || LEFTPARENTHESIS != opNew)
+                    throw new IllegalArgumentException("mismatched parentheses");
+            } else if (isOperator(token)) {
+
+                Operator opNew = (Operator) token;
+                if (!operatorStack.empty()) {
+                    Operator opOld = operatorStack.peek();
+                    if (opOld.isCompareable() && opNew.compare(opOld) != 1) {
+                        segments.add(operatorStack.pop());
+                    }
+                }
+                operatorStack.push(opNew);
+            } else
+                throw new IllegalArgumentException("illegal token " + token);
+        }
+
+        while (!operatorStack.empty()) {
+            Operator operator = operatorStack.pop();
+            if (LEFTPARENTHESIS == operator || RIGHTPARENTHESIS == operator)
+                throw new IllegalArgumentException("mismatched parentheses " + operator);
+            segments.add(operator);
+        }
+
+        return segments;
+    }
+
+    /**
+     *
+     * @param expression
+     *
+     * @return
+     *
+     * @throws Exception
+     */
+    private static List<Op> participle(String expression) {
+        List<Op> segments = new ArrayList<Op>();
+
+        int size = expression.length();
+        int wordStartIndex = -1;
+        int wordLen = 0;
+        Type preType = Type.NULL;
+
+        for (int i = 0; i < size; i++) {
+            int chValue = (int) expression.charAt(i);
+
+            if ((97 <= chValue && chValue <= 122) || (65 <= chValue && chValue <= 90)
+                    || (49 <= chValue && chValue <= 57) || 95 == chValue) {
+
+
+                if (Type.OPERATOR == preType || Type.SEPAERATOR == preType || Type.NULL == preType
+                        || Type.PARENTHESIS == preType) {
+                    if (Type.OPERATOR == preType) {
+                        segments.add(createOperator(expression.substring(wordStartIndex, wordStartIndex
+                                + wordLen)));
+                    }
+                    wordStartIndex = i;
+                    wordLen = 0;
+                }
+                preType = Type.OPERAND;
+                wordLen++;
+            } else if (40 == chValue || 41 == chValue) {
+
+
+                if (Type.OPERATOR == preType) {
+                    segments.add(createOperator(expression
+                            .substring(wordStartIndex, wordStartIndex + wordLen)));
+                    wordStartIndex = -1;
+                    wordLen = 0;
+                } else if (Type.OPERAND == preType) {
+                    segments.add(new Operand(expression.substring(wordStartIndex, wordStartIndex + wordLen)));
+                    wordStartIndex = -1;
+                    wordLen = 0;
+                }
+
+                preType = Type.PARENTHESIS;
+                segments.add(createOperator((char) chValue + ""));
+            } else if (38 == chValue || 124 == chValue) {
+
+                if (Type.OPERAND == preType || Type.SEPAERATOR == preType || Type.PARENTHESIS == preType) {
+                    if (Type.OPERAND == preType) {
+                        segments.add(new Operand(expression.substring(wordStartIndex, wordStartIndex
+                                + wordLen)));
+                    }
+                    wordStartIndex = i;
+                    wordLen = 0;
+                }
+                preType = Type.OPERATOR;
+                wordLen++;
+            } else if (32 == chValue || 9 == chValue) {
+
+
+                if (Type.OPERATOR == preType) {
+                    segments.add(createOperator(expression
+                            .substring(wordStartIndex, wordStartIndex + wordLen)));
+                    wordStartIndex = -1;
+                    wordLen = 0;
+                } else if (Type.OPERAND == preType) {
+                    segments.add(new Operand(expression.substring(wordStartIndex, wordStartIndex + wordLen)));
+                    wordStartIndex = -1;
+                    wordLen = 0;
+                }
+                preType = Type.SEPAERATOR;
+            } else {
+
+                throw new IllegalArgumentException("illegal expression, at index " + i + " " + (char) chValue);
+            }
+
+        }
+
+        if (wordLen > 0) {
+            segments.add(new Operand(expression.substring(wordStartIndex, wordStartIndex + wordLen)));
+        }
+        return segments;
+    }
+
+    public static boolean isOperand(Op token) {
+        return token instanceof Operand;
+    }
+
+    public static boolean isLeftParenthesis(Op token) {
+        return token instanceof Operator && LEFTPARENTHESIS == (Operator) token;
+    }
+
+    public static boolean isRightParenthesis(Op token) {
+        return token instanceof Operator && RIGHTPARENTHESIS == (Operator) token;
+    }
+
+    public static boolean isOperator(Op token) {
+        return token instanceof Operator;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/filter/impl/Type.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/filter/impl/Type.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/filter/impl/Type.java
new file mode 100644
index 0000000..1c0b343
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/filter/impl/Type.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package com.alibaba.rocketmq.common.filter.impl;
+
+public enum Type {
+    NULL,
+    OPERAND,
+    OPERATOR,
+    PARENTHESIS,
+    SEPAERATOR;
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/help/FAQUrl.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/help/FAQUrl.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/help/FAQUrl.java
new file mode 100644
index 0000000..06a74a6
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/help/FAQUrl.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.common.help;
+
+/**
+ * @author shijia.wxr
+ */
+public class FAQUrl {
+
+    public static final String APPLY_TOPIC_URL = //
+            "http://docs.aliyun.com/cn#/pub/ons/faq/exceptions&topic_not_exist";
+
+
+    public static final String NAME_SERVER_ADDR_NOT_EXIST_URL = //
+            "http://docs.aliyun.com/cn#/pub/ons/faq/exceptions&namesrv_not_exist";
+
+
+    public static final String GROUP_NAME_DUPLICATE_URL = //
+            "http://docs.aliyun.com/cn#/pub/ons/faq/exceptions&group_duplicate";
+
+
+    public static final String CLIENT_PARAMETER_CHECK_URL = //
+            "http://docs.aliyun.com/cn#/pub/ons/faq/exceptions&parameter_check_failed";
+
+
+    public static final String SUBSCRIPTION_GROUP_NOT_EXIST = //
+            "http://docs.aliyun.com/cn#/pub/ons/faq/exceptions&subGroup_not_exist";
+
+
+    public static final String CLIENT_SERVICE_NOT_OK = //
+            "http://docs.aliyun.com/cn#/pub/ons/faq/exceptions&service_not_ok";
+
+    // FAQ: No route info of this topic, TopicABC
+    public static final String NO_TOPIC_ROUTE_INFO = //
+            "http://docs.aliyun.com/cn#/pub/ons/faq/exceptions&topic_not_exist";
+
+
+    public static final String LOAD_JSON_EXCEPTION = //
+            "http://docs.aliyun.com/cn#/pub/ons/faq/exceptions&load_json_exception";
+
+
+    public static final String SAME_GROUP_DIFFERENT_TOPIC = //
+            "http://docs.aliyun.com/cn#/pub/ons/faq/exceptions&subscription_exception";
+
+
+    public static final String MQLIST_NOT_EXIST = //
+            "http://docs.aliyun.com/cn#/pub/ons/faq/exceptions&queue_not_exist";
+
+    public static final String UNEXPECTED_EXCEPTION_URL = //
+            "http://docs.aliyun.com/cn#/pub/ons/faq/exceptions&unexpected_exception";
+
+
+    public static final String SEND_MSG_FAILED = //
+            "http://docs.aliyun.com/cn#/pub/ons/faq/exceptions&send_msg_failed";
+
+
+    public static final String UNKNOWN_HOST_EXCEPTION = //
+            "http://docs.aliyun.com/cn#/pub/ons/faq/exceptions&unknown_host";
+
+    private static final String TIP_STRING_BEGIN = "\nSee ";
+    private static final String TIP_STRING_END = " for further details.";
+
+
+    public static String suggestTodo(final String url) {
+        StringBuilder sb = new StringBuilder();
+        sb.append(TIP_STRING_BEGIN);
+        sb.append(url);
+        sb.append(TIP_STRING_END);
+        return sb.toString();
+    }
+
+    public static String attachDefaultURL(final String errorMessage) {
+        if (errorMessage != null) {
+            int index = errorMessage.indexOf(TIP_STRING_BEGIN);
+            if (-1 == index) {
+                StringBuilder sb = new StringBuilder();
+                sb.append(errorMessage);
+                sb.append("\n");
+                sb.append("For more information, please visit the url, ");
+                sb.append(UNEXPECTED_EXCEPTION_URL);
+                return sb.toString();
+            }
+        }
+
+        return errorMessage;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/hook/FilterCheckHook.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/hook/FilterCheckHook.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/hook/FilterCheckHook.java
new file mode 100644
index 0000000..f5d9d7e
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/hook/FilterCheckHook.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package com.alibaba.rocketmq.common.hook;
+
+import java.nio.ByteBuffer;
+
+
+/**
+ *
+ * @author manhong.yqd
+ *
+ */
+public interface FilterCheckHook {
+    public String hookName();
+
+
+    public boolean isFilterMatched(final boolean isUnitMode, final ByteBuffer byteBuffer);
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/message/Message.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/message/Message.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/message/Message.java
new file mode 100644
index 0000000..eeb6f52
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/message/Message.java
@@ -0,0 +1,212 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.common.message;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+
+/**
+ *
+ * @author shijia.wxr
+ *
+ */
+public class Message implements Serializable {
+    private static final long serialVersionUID = 8445773977080406428L;
+
+    private String topic;
+    private int flag;
+    private Map<String, String> properties;
+    private byte[] body;
+
+
+    public Message() {
+    }
+
+
+    public Message(String topic, byte[] body) {
+        this(topic, "", "", 0, body, true);
+    }
+
+
+    public Message(String topic, String tags, String keys, int flag, byte[] body, boolean waitStoreMsgOK) {
+        this.topic = topic;
+        this.flag = flag;
+        this.body = body;
+
+        if (tags != null && tags.length() > 0)
+            this.setTags(tags);
+
+        if (keys != null && keys.length() > 0)
+            this.setKeys(keys);
+
+        this.setWaitStoreMsgOK(waitStoreMsgOK);
+    }
+
+    public void setKeys(String keys) {
+        this.putProperty(MessageConst.PROPERTY_KEYS, keys);
+    }
+
+    void putProperty(final String name, final String value) {
+        if (null == this.properties) {
+            this.properties = new HashMap<String, String>();
+        }
+
+        this.properties.put(name, value);
+    }
+
+
+    public Message(String topic, String tags, byte[] body) {
+        this(topic, tags, "", 0, body, true);
+    }
+
+
+    public Message(String topic, String tags, String keys, byte[] body) {
+        this(topic, tags, keys, 0, body, true);
+    }
+
+    void clearProperty(final String name) {
+        if (null != this.properties) {
+            this.properties.remove(name);
+        }
+    }
+
+    public void putUserProperty(final String name, final String value) {
+        if (MessageConst.STRING_HASH_SET.contains(name)) {
+            throw new RuntimeException(String.format(
+                    "The Property<%s> is used by system, input another please", name));
+        }
+        this.putProperty(name, value);
+    }
+
+    public String getUserProperty(final String name) {
+        return this.getProperty(name);
+    }
+
+    public String getProperty(final String name) {
+        if (null == this.properties) {
+            this.properties = new HashMap<String, String>();
+        }
+
+        return this.properties.get(name);
+    }
+
+    public String getTopic() {
+        return topic;
+    }
+
+    public void setTopic(String topic) {
+        this.topic = topic;
+    }
+
+    public String getTags() {
+        return this.getProperty(MessageConst.PROPERTY_TAGS);
+    }
+
+    public void setTags(String tags) {
+        this.putProperty(MessageConst.PROPERTY_TAGS, tags);
+    }
+
+    public String getKeys() {
+        return this.getProperty(MessageConst.PROPERTY_KEYS);
+    }
+
+    public void setKeys(Collection<String> keys) {
+        StringBuffer sb = new StringBuffer();
+        for (String k : keys) {
+            sb.append(k);
+            sb.append(MessageConst.KEY_SEPARATOR);
+        }
+
+        this.setKeys(sb.toString().trim());
+    }
+
+
+    public int getDelayTimeLevel() {
+        String t = this.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL);
+        if (t != null) {
+            return Integer.parseInt(t);
+        }
+
+        return 0;
+    }
+
+
+    public void setDelayTimeLevel(int level) {
+        this.putProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL, String.valueOf(level));
+    }
+
+
+    public boolean isWaitStoreMsgOK() {
+        String result = this.getProperty(MessageConst.PROPERTY_WAIT_STORE_MSG_OK);
+        if (null == result)
+            return true;
+
+        return Boolean.parseBoolean(result);
+    }
+
+
+    public void setWaitStoreMsgOK(boolean waitStoreMsgOK) {
+        this.putProperty(MessageConst.PROPERTY_WAIT_STORE_MSG_OK, Boolean.toString(waitStoreMsgOK));
+    }
+
+
+    public int getFlag() {
+        return flag;
+    }
+
+
+    public void setFlag(int flag) {
+        this.flag = flag;
+    }
+
+
+    public byte[] getBody() {
+        return body;
+    }
+
+
+    public void setBody(byte[] body) {
+        this.body = body;
+    }
+
+
+    public Map<String, String> getProperties() {
+        return properties;
+    }
+
+
+    void setProperties(Map<String, String> properties) {
+        this.properties = properties;
+    }
+
+    public String getBuyerId() {
+        return getProperty(MessageConst.PROPERTY_BUYER_ID);
+    }
+
+    public void setBuyerId(String buyerId) {
+        putProperty(MessageConst.PROPERTY_BUYER_ID, buyerId);
+    }
+
+    @Override
+    public String toString() {
+        return "Message [topic=" + topic + ", flag=" + flag + ", properties=" + properties + ", body="
+                + (body != null ? body.length : 0) + "]";
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/message/MessageAccessor.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/message/MessageAccessor.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/message/MessageAccessor.java
new file mode 100644
index 0000000..bbbca1a
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/message/MessageAccessor.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package com.alibaba.rocketmq.common.message;
+
+import java.util.Map;
+
+
+public class MessageAccessor {
+
+    public static void clearProperty(final Message msg, final String name) {
+        msg.clearProperty(name);
+    }
+
+    public static void setProperties(final Message msg, Map<String, String> properties) {
+        msg.setProperties(properties);
+    }
+
+    public static void setTransferFlag(final Message msg, String unit) {
+        putProperty(msg, MessageConst.PROPERTY_TRANSFER_FLAG, unit);
+    }
+
+    public static void putProperty(final Message msg, final String name, final String value) {
+        msg.putProperty(name, value);
+    }
+
+    public static String getTransferFlag(final Message msg) {
+        return msg.getProperty(MessageConst.PROPERTY_TRANSFER_FLAG);
+    }
+
+
+    public static void setCorrectionFlag(final Message msg, String unit) {
+        putProperty(msg, MessageConst.PROPERTY_CORRECTION_FLAG, unit);
+    }
+
+
+    public static String getCorrectionFlag(final Message msg) {
+        return msg.getProperty(MessageConst.PROPERTY_CORRECTION_FLAG);
+    }
+
+
+    public static void setOriginMessageId(final Message msg, String originMessageId) {
+        putProperty(msg, MessageConst.PROPERTY_ORIGIN_MESSAGE_ID, originMessageId);
+    }
+
+
+    public static String getOriginMessageId(final Message msg) {
+        return msg.getProperty(MessageConst.PROPERTY_ORIGIN_MESSAGE_ID);
+    }
+
+
+    public static void setMQ2Flag(final Message msg, String flag) {
+        putProperty(msg, MessageConst.PROPERTY_MQ2_FLAG, flag);
+    }
+
+
+    public static String getMQ2Flag(final Message msg) {
+        return msg.getProperty(MessageConst.PROPERTY_MQ2_FLAG);
+    }
+
+
+    public static void setReconsumeTime(final Message msg, String reconsumeTimes) {
+        putProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME, reconsumeTimes);
+    }
+
+
+    public static String getReconsumeTime(final Message msg) {
+        return msg.getProperty(MessageConst.PROPERTY_RECONSUME_TIME);
+    }
+
+
+    public static void setMaxReconsumeTimes(final Message msg, String maxReconsumeTimes) {
+        putProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES, maxReconsumeTimes);
+    }
+
+
+    public static String getMaxReconsumeTimes(final Message msg) {
+        return msg.getProperty(MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
+    }
+
+    public static void setConsumeStartTimeStamp(final Message msg, String propertyConsumeStartTimeStamp) {
+        putProperty(msg, MessageConst.PROPERTY_CONSUME_START_TIMESTAMP, propertyConsumeStartTimeStamp);
+    }
+
+
+    public static String getConsumeStartTimeStamp(final Message msg) {
+        return msg.getProperty(MessageConst.PROPERTY_CONSUME_START_TIMESTAMP);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/message/MessageClientExt.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/message/MessageClientExt.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/message/MessageClientExt.java
new file mode 100644
index 0000000..0ab372e
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/message/MessageClientExt.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.common.message;
+
+public class MessageClientExt extends MessageExt {
+        
+    public void setOffsetMsgId(String offsetMsgId) {
+        super.setMsgId(offsetMsgId);
+    }
+    
+
+    public String getOffsetMsgId() {
+        return super.getMsgId();
+    }
+    
+    public void setMsgId(String msgId) {
+        //DO NOTHING
+        //MessageClientIDSetter.setUniqID(this);
+    }
+    
+    @Override
+    public String getMsgId() {
+        String uniqID = MessageClientIDSetter.getUniqID(this);
+        if (uniqID == null) {
+            return this.getOffsetMsgId();
+        }
+        else {
+            return uniqID;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/message/MessageClientIDSetter.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/message/MessageClientIDSetter.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/message/MessageClientIDSetter.java
new file mode 100644
index 0000000..82cd3d1
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/message/MessageClientIDSetter.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.common.message;
+
+import com.alibaba.rocketmq.common.UtilAll;
+
+import java.nio.ByteBuffer;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class MessageClientIDSetter {
+    private static final String TOPIC_KEY_SPLITTER = "#";
+    private static final int LEN;
+    private static final String FIX_STRING;
+    private static final AtomicInteger COUNTER;
+    private static long startTime;
+    private static long nextStartTime;
+
+    static {
+        LEN = 4 + 2 + 4 + 4 + 2;
+        ByteBuffer tempBuffer = ByteBuffer.allocate(10);
+        tempBuffer.position(2);
+        tempBuffer.putInt(UtilAll.getPid());
+        tempBuffer.position(0);
+        try {
+            tempBuffer.put(UtilAll.getIP());
+        } catch (Exception e) {
+            tempBuffer.put(createFakeIP());
+        }
+        tempBuffer.position(6);
+        tempBuffer.putInt(MessageClientIDSetter.class.getClassLoader().hashCode()); //4
+        FIX_STRING = UtilAll.bytes2string(tempBuffer.array());
+        setStartTime(System.currentTimeMillis());
+        COUNTER = new AtomicInteger(0);
+    }
+
+    private synchronized static void setStartTime(long millis) {
+        Calendar cal = Calendar.getInstance();
+        cal.setTimeInMillis(millis);
+        cal.set(Calendar.DAY_OF_MONTH, 1);
+        cal.set(Calendar.HOUR, 0);
+        cal.set(Calendar.MINUTE, 0);
+        cal.set(Calendar.SECOND, 0);
+        cal.set(Calendar.MILLISECOND, 0);
+        startTime = cal.getTimeInMillis();
+        cal.add(Calendar.MONTH, 1);
+        nextStartTime = cal.getTimeInMillis();
+    }
+
+    public static Date getNearlyTimeFromID(String msgID) {
+        ByteBuffer buf = ByteBuffer.allocate(8);
+        byte[] bytes = UtilAll.string2bytes(msgID);
+        buf.put((byte) 0);
+        buf.put((byte) 0);
+        buf.put((byte) 0);
+        buf.put((byte) 0);
+        buf.put(bytes, 10, 4);
+        buf.position(0);
+        long spanMS = buf.getLong();
+        Calendar cal = Calendar.getInstance();
+        long now = cal.getTimeInMillis();
+        cal.set(Calendar.DAY_OF_MONTH, 1);
+        cal.set(Calendar.HOUR, 0);
+        cal.set(Calendar.MINUTE, 0);
+        cal.set(Calendar.SECOND, 0);
+        cal.set(Calendar.MILLISECOND, 0);
+        long monStartTime = cal.getTimeInMillis();
+        if (monStartTime + spanMS >= now) {
+            cal.add(Calendar.MONTH, -1);
+            monStartTime = cal.getTimeInMillis();
+        }
+        cal.setTimeInMillis(monStartTime + spanMS);
+        return cal.getTime();
+    }
+
+    public static String getIPStrFromID(String msgID) {
+        byte[] ipBytes = getIPFromID(msgID);
+        return UtilAll.ipToIPv4Str(ipBytes);
+    }
+
+    public static byte[] getIPFromID(String msgID) {
+        byte[] result = new byte[4];
+        byte[] bytes = UtilAll.string2bytes(msgID);
+        System.arraycopy(bytes, 0, result, 0, 4);
+        return result;
+    }
+
+    public static String createUniqID() {
+        StringBuilder sb = new StringBuilder(LEN * 2);
+        sb.append(FIX_STRING);
+        sb.append(UtilAll.bytes2string(createUniqIDBuffer()));
+        return sb.toString();
+    }
+
+
+    private static byte[] createUniqIDBuffer() {
+        ByteBuffer buffer = ByteBuffer.allocate(4 + 2);
+        long current = System.currentTimeMillis();
+        if (current >= nextStartTime) {
+            setStartTime(current);
+        }
+        buffer.position(0);
+        buffer.putInt((int) (System.currentTimeMillis() - startTime));
+        buffer.putShort((short) COUNTER.getAndIncrement());
+        return buffer.array();
+    }
+
+    public static void setUniqID(final Message msg) {
+        if (msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX) == null) {
+            msg.putProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, createUniqID());
+        }
+    }
+
+    public static String getUniqID(final Message msg) {
+        return msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
+    }
+
+    public static byte[] createFakeIP() {
+        ByteBuffer bb = ByteBuffer.allocate(8);
+        bb.putLong(System.currentTimeMillis());
+        bb.position(4);
+        byte[] fakeIP = new byte[4];
+        bb.get(fakeIP);
+        return fakeIP;
+    }
+}
+