You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2016/12/19 09:40:41 UTC
[24/43] incubator-rocketmq git commit: Finish code dump. Reviewed by:
@yukon @vongosling @stevenschew @vintagewang @lollipop @zander
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/UtilAll.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/UtilAll.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/UtilAll.java
new file mode 100644
index 0000000..4429e3d
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/UtilAll.java
@@ -0,0 +1,525 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.common;
+
+import com.alibaba.rocketmq.remoting.common.RemotingHelper;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.lang.management.RuntimeMXBean;
+import java.net.Inet4Address;
+import java.net.InetAddress;
+import java.net.NetworkInterface;
+import java.text.NumberFormat;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.*;
+import java.util.zip.CRC32;
+import java.util.zip.DeflaterOutputStream;
+import java.util.zip.InflaterInputStream;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class UtilAll {
+ public static final String YYYY_MM_DD_HH_MM_SS = "yyyy-MM-dd HH:mm:ss";
+ public static final String YYYY_MM_DD_HH_MM_SS_SSS = "yyyy-MM-dd#HH:mm:ss:SSS";
+ public static final String YYYY_MMDD_HHMMSS = "yyyyMMddHHmmss";
+
+
+ public static int getPid() {
+ RuntimeMXBean runtime = ManagementFactory.getRuntimeMXBean();
+ String name = runtime.getName(); // format: "pid@hostname"
+ try {
+ return Integer.parseInt(name.substring(0, name.indexOf('@')));
+ } catch (Exception e) {
+ return -1;
+ }
+ }
+
+ public static String currentStackTrace() {
+ StringBuilder sb = new StringBuilder();
+ StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
+ for (StackTraceElement ste : stackTrace) {
+ sb.append("\n\t");
+ sb.append(ste.toString());
+ }
+
+ return sb.toString();
+ }
+
+ public static String offset2FileName(final long offset) {
+ final NumberFormat nf = NumberFormat.getInstance();
+ nf.setMinimumIntegerDigits(20);
+ nf.setMaximumFractionDigits(0);
+ nf.setGroupingUsed(false);
+ return nf.format(offset);
+ }
+
+ public static long computeEclipseTimeMilliseconds(final long beginTime) {
+ return System.currentTimeMillis() - beginTime;
+ }
+
+
+ public static boolean isItTimeToDo(final String when) {
+ String[] whiles = when.split(";");
+ if (whiles != null && whiles.length > 0) {
+ Calendar now = Calendar.getInstance();
+ for (String w : whiles) {
+ int nowHour = Integer.parseInt(w);
+ if (nowHour == now.get(Calendar.HOUR_OF_DAY)) {
+ return true;
+ }
+ }
+ }
+
+ return false;
+ }
+
+
+ public static String timeMillisToHumanString() {
+ return timeMillisToHumanString(System.currentTimeMillis());
+ }
+
+
+ public static String timeMillisToHumanString(final long t) {
+ Calendar cal = Calendar.getInstance();
+ cal.setTimeInMillis(t);
+ return String.format("%04d%02d%02d%02d%02d%02d%03d", cal.get(Calendar.YEAR), cal.get(Calendar.MONTH) + 1,
+ cal.get(Calendar.DAY_OF_MONTH), cal.get(Calendar.HOUR_OF_DAY), cal.get(Calendar.MINUTE), cal.get(Calendar.SECOND),
+ cal.get(Calendar.MILLISECOND));
+ }
+
+
+ public static long computNextMorningTimeMillis() {
+ Calendar cal = Calendar.getInstance();
+ cal.setTimeInMillis(System.currentTimeMillis());
+ cal.add(Calendar.DAY_OF_MONTH, 1);
+ cal.set(Calendar.HOUR_OF_DAY, 0);
+ cal.set(Calendar.MINUTE, 0);
+ cal.set(Calendar.SECOND, 0);
+ cal.set(Calendar.MILLISECOND, 0);
+
+ return cal.getTimeInMillis();
+ }
+
+
+ public static long computNextMinutesTimeMillis() {
+ Calendar cal = Calendar.getInstance();
+ cal.setTimeInMillis(System.currentTimeMillis());
+ cal.add(Calendar.DAY_OF_MONTH, 0);
+ cal.add(Calendar.HOUR_OF_DAY, 0);
+ cal.add(Calendar.MINUTE, 1);
+ cal.set(Calendar.SECOND, 0);
+ cal.set(Calendar.MILLISECOND, 0);
+
+ return cal.getTimeInMillis();
+ }
+
+
+ public static long computNextHourTimeMillis() {
+ Calendar cal = Calendar.getInstance();
+ cal.setTimeInMillis(System.currentTimeMillis());
+ cal.add(Calendar.DAY_OF_MONTH, 0);
+ cal.add(Calendar.HOUR_OF_DAY, 1);
+ cal.set(Calendar.MINUTE, 0);
+ cal.set(Calendar.SECOND, 0);
+ cal.set(Calendar.MILLISECOND, 0);
+
+ return cal.getTimeInMillis();
+ }
+
+
+ public static long computNextHalfHourTimeMillis() {
+ Calendar cal = Calendar.getInstance();
+ cal.setTimeInMillis(System.currentTimeMillis());
+ cal.add(Calendar.DAY_OF_MONTH, 0);
+ cal.add(Calendar.HOUR_OF_DAY, 1);
+ cal.set(Calendar.MINUTE, 30);
+ cal.set(Calendar.SECOND, 0);
+ cal.set(Calendar.MILLISECOND, 0);
+
+ return cal.getTimeInMillis();
+ }
+
+
+ public static String timeMillisToHumanString2(final long t) {
+ Calendar cal = Calendar.getInstance();
+ cal.setTimeInMillis(t);
+ return String.format("%04d-%02d-%02d %02d:%02d:%02d,%03d",
+ cal.get(Calendar.YEAR),
+ cal.get(Calendar.MONTH) + 1,
+ cal.get(Calendar.DAY_OF_MONTH),
+ cal.get(Calendar.HOUR_OF_DAY),
+ cal.get(Calendar.MINUTE),
+ cal.get(Calendar.SECOND),
+ cal.get(Calendar.MILLISECOND));
+ }
+
+
+ public static String timeMillisToHumanString3(final long t) {
+ Calendar cal = Calendar.getInstance();
+ cal.setTimeInMillis(t);
+ return String.format("%04d%02d%02d%02d%02d%02d",
+ cal.get(Calendar.YEAR),
+ cal.get(Calendar.MONTH) + 1,
+ cal.get(Calendar.DAY_OF_MONTH),
+ cal.get(Calendar.HOUR_OF_DAY),
+ cal.get(Calendar.MINUTE),
+ cal.get(Calendar.SECOND));
+ }
+
+
+ public static double getDiskPartitionSpaceUsedPercent(final String path) {
+ if (null == path || path.isEmpty())
+ return -1;
+
+ try {
+ File file = new File(path);
+ if (!file.exists()) {
+ boolean result = file.mkdirs();
+ if (!result) {
+ }
+ }
+
+ long totalSpace = file.getTotalSpace();
+ long freeSpace = file.getFreeSpace();
+ long usedSpace = totalSpace - freeSpace;
+ if (totalSpace > 0) {
+ return usedSpace / (double) totalSpace;
+ }
+ } catch (Exception e) {
+ return -1;
+ }
+
+ return -1;
+ }
+
+
+ public static final int crc32(byte[] array) {
+ if (array != null) {
+ return crc32(array, 0, array.length);
+ }
+
+ return 0;
+ }
+
+
+ public static final int crc32(byte[] array, int offset, int length) {
+ CRC32 crc32 = new CRC32();
+ crc32.update(array, offset, length);
+ return (int) (crc32.getValue() & 0x7FFFFFFF);
+ }
+
+ final static char[] HEX_ARRAY = "0123456789ABCDEF".toCharArray();
+
+ public static String bytes2string(byte[] src) {
+ char[] hexChars = new char[src.length * 2];
+ for (int j = 0; j < src.length; j++) {
+ int v = src[j] & 0xFF;
+ hexChars[j * 2] = HEX_ARRAY[v >>> 4];
+ hexChars[j * 2 + 1] = HEX_ARRAY[v & 0x0F];
+ }
+ return new String(hexChars);
+ }
+
+ public static byte[] string2bytes(String hexString) {
+ if (hexString == null || hexString.equals("")) {
+ return null;
+ }
+ hexString = hexString.toUpperCase();
+ int length = hexString.length() / 2;
+ char[] hexChars = hexString.toCharArray();
+ byte[] d = new byte[length];
+ for (int i = 0; i < length; i++) {
+ int pos = i * 2;
+ d[i] = (byte) (charToByte(hexChars[pos]) << 4 | charToByte(hexChars[pos + 1]));
+ }
+ return d;
+ }
+
+
+ private static byte charToByte(char c) {
+ return (byte) "0123456789ABCDEF".indexOf(c);
+ }
+
+
+ public static byte[] uncompress(final byte[] src) throws IOException {
+ byte[] result = src;
+ byte[] uncompressData = new byte[src.length];
+ ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(src);
+ InflaterInputStream inflaterInputStream = new InflaterInputStream(byteArrayInputStream);
+ ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(src.length);
+
+ try {
+ while (true) {
+ int len = inflaterInputStream.read(uncompressData, 0, uncompressData.length);
+ if (len <= 0) {
+ break;
+ }
+ byteArrayOutputStream.write(uncompressData, 0, len);
+ }
+ byteArrayOutputStream.flush();
+ result = byteArrayOutputStream.toByteArray();
+ } catch (IOException e) {
+ throw e;
+ } finally {
+ try {
+ byteArrayInputStream.close();
+ } catch (IOException e) {
+ }
+ try {
+ inflaterInputStream.close();
+ } catch (IOException e) {
+ }
+ try {
+ byteArrayOutputStream.close();
+ } catch (IOException e) {
+ }
+ }
+
+ return result;
+ }
+
+
+ public static byte[] compress(final byte[] src, final int level) throws IOException {
+ byte[] result = src;
+ ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(src.length);
+ java.util.zip.Deflater defeater = new java.util.zip.Deflater(level);
+ DeflaterOutputStream deflaterOutputStream = new DeflaterOutputStream(byteArrayOutputStream, defeater);
+ try {
+ deflaterOutputStream.write(src);
+ deflaterOutputStream.finish();
+ deflaterOutputStream.close();
+ result = byteArrayOutputStream.toByteArray();
+ } catch (IOException e) {
+ defeater.end();
+ throw e;
+ } finally {
+ try {
+ byteArrayOutputStream.close();
+ } catch (IOException ignored) {
+ }
+
+ defeater.end();
+ }
+
+ return result;
+ }
+
+
+ public static int asInt(String str, int defaultValue) {
+ try {
+ return Integer.parseInt(str);
+ } catch (Exception e) {
+ return defaultValue;
+ }
+ }
+
+
+ public static long asLong(String str, long defaultValue) {
+ try {
+ return Long.parseLong(str);
+ } catch (Exception e) {
+ return defaultValue;
+ }
+ }
+
+
+ public static String formatDate(Date date, String pattern) {
+ SimpleDateFormat df = new SimpleDateFormat(pattern);
+ return df.format(date);
+ }
+
+
+ public static Date parseDate(String date, String pattern) {
+ SimpleDateFormat df = new SimpleDateFormat(pattern);
+ try {
+ return df.parse(date);
+ } catch (ParseException e) {
+ return null;
+ }
+ }
+
+
+ public static String responseCode2String(final int code) {
+ return Integer.toString(code);
+ }
+
+
+ public static String frontStringAtLeast(final String str, final int size) {
+ if (str != null) {
+ if (str.length() > size) {
+ return str.substring(0, size);
+ }
+ }
+
+ return str;
+ }
+
+
+ public static boolean isBlank(String str) {
+ int strLen;
+ if (str == null || (strLen = str.length()) == 0) {
+ return true;
+ }
+ for (int i = 0; i < strLen; i++) {
+ if (!Character.isWhitespace(str.charAt(i))) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+
+ public static String jstack() {
+ return jstack(Thread.getAllStackTraces());
+ }
+
+
+ public static String jstack(Map<Thread, StackTraceElement[]> map) {
+ StringBuilder result = new StringBuilder();
+ try {
+ Iterator<Map.Entry<Thread, StackTraceElement[]>> ite = map.entrySet().iterator();
+ while (ite.hasNext()) {
+ Map.Entry<Thread, StackTraceElement[]> entry = ite.next();
+ StackTraceElement[] elements = entry.getValue();
+ Thread thread = entry.getKey();
+ if (elements != null && elements.length > 0) {
+ String threadName = entry.getKey().getName();
+ result.append(String.format("%-40sTID: %d STATE: %s%n", threadName, thread.getId(), thread.getState()));
+ for (StackTraceElement el : elements) {
+ result.append(String.format("%-40s%s%n", threadName, el.toString()));
+ }
+ result.append("\n");
+ }
+ }
+ } catch (Throwable e) {
+ result.append(RemotingHelper.exceptionSimpleDesc(e));
+ }
+
+ return result.toString();
+ }
+
+ public static boolean isInternalIP(byte[] ip) {
+ if (ip.length != 4) {
+ throw new RuntimeException("illegal ipv4 bytes");
+ }
+
+
+ //10.0.0.0~10.255.255.255
+ //172.16.0.0~172.31.255.255
+ //192.168.0.0~192.168.255.255
+ if (ip[0] == (byte) 10) {
+
+ return true;
+ } else if (ip[0] == (byte) 172) {
+ if (ip[1] >= (byte) 16 && ip[1] <= (byte) 31) {
+ return true;
+ }
+ } else if (ip[0] == (byte) 192) {
+ if (ip[1] == (byte) 168) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private static boolean ipCheck(byte[] ip) {
+ if (ip.length != 4) {
+ throw new RuntimeException("illegal ipv4 bytes");
+ }
+
+// if (ip[0] == (byte)30 && ip[1] == (byte)10 && ip[2] == (byte)163 && ip[3] == (byte)120) {
+// }
+
+
+ if (ip[0] >= (byte) 1 && ip[0] <= (byte) 126) {
+ if (ip[1] == (byte) 1 && ip[2] == (byte) 1 && ip[3] == (byte) 1) {
+ return false;
+ }
+ if (ip[1] == (byte) 0 && ip[2] == (byte) 0 && ip[3] == (byte) 0) {
+ return false;
+ }
+ return true;
+ } else if (ip[0] >= (byte) 128 && ip[0] <= (byte) 191) {
+ if (ip[2] == (byte) 1 && ip[3] == (byte) 1) {
+ return false;
+ }
+ if (ip[2] == (byte) 0 && ip[3] == (byte) 0) {
+ return false;
+ }
+ return true;
+ } else if (ip[0] >= (byte) 192 && ip[0] <= (byte) 223) {
+ if (ip[3] == (byte) 1) {
+ return false;
+ }
+ if (ip[3] == (byte) 0) {
+ return false;
+ }
+ return true;
+ }
+ return false;
+ }
+
+ public static String ipToIPv4Str(byte[] ip) {
+ if (ip.length != 4) {
+ return null;
+ }
+ return new StringBuilder().append(ip[0] & 0xFF).append(".").append(
+ ip[1] & 0xFF).append(".").append(ip[2] & 0xFF)
+ .append(".").append(ip[3] & 0xFF).toString();
+ }
+
+ public static byte[] getIP() {
+ try {
+ Enumeration allNetInterfaces = NetworkInterface.getNetworkInterfaces();
+ InetAddress ip = null;
+ byte[] internalIP = null;
+ while (allNetInterfaces.hasMoreElements()) {
+ NetworkInterface netInterface = (NetworkInterface) allNetInterfaces.nextElement();
+ Enumeration addresses = netInterface.getInetAddresses();
+ while (addresses.hasMoreElements()) {
+ ip = (InetAddress) addresses.nextElement();
+ if (ip != null && ip instanceof Inet4Address) {
+ byte[] ipByte = ip.getAddress();
+ if (ipByte.length == 4) {
+ if (ipCheck(ipByte)) {
+ if (!isInternalIP(ipByte)) {
+ return ipByte;
+ } else if (internalIP == null) {
+ internalIP = ipByte;
+ }
+ }
+ }
+ }
+ }
+ }
+ if (internalIP != null) {
+ return internalIP;
+ } else {
+ throw new RuntimeException("Can not get local ip");
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("Can not get local ip", e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/admin/ConsumeStats.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/admin/ConsumeStats.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/admin/ConsumeStats.java
new file mode 100644
index 0000000..d8c9311
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/admin/ConsumeStats.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.common.admin;
+
+import com.alibaba.rocketmq.common.message.MessageQueue;
+import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map.Entry;
+
+
+/**
+ *
+ * @author shijia.wxr
+ *
+ */
+public class ConsumeStats extends RemotingSerializable {
+ private HashMap<MessageQueue, OffsetWrapper> offsetTable = new HashMap<MessageQueue, OffsetWrapper>();
+ private double consumeTps = 0;
+
+
+ public long computeTotalDiff() {
+ long diffTotal = 0L;
+
+ Iterator<Entry<MessageQueue, OffsetWrapper>> it = this.offsetTable.entrySet().iterator();
+ while (it.hasNext()) {
+ Entry<MessageQueue, OffsetWrapper> next = it.next();
+ long diff = next.getValue().getBrokerOffset() - next.getValue().getConsumerOffset();
+ diffTotal += diff;
+ }
+
+ return diffTotal;
+ }
+
+
+ public HashMap<MessageQueue, OffsetWrapper> getOffsetTable() {
+ return offsetTable;
+ }
+
+
+ public void setOffsetTable(HashMap<MessageQueue, OffsetWrapper> offsetTable) {
+ this.offsetTable = offsetTable;
+ }
+
+ public double getConsumeTps() {
+ return consumeTps;
+ }
+
+ public void setConsumeTps(double consumeTps) {
+ this.consumeTps = consumeTps;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/admin/OffsetWrapper.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/admin/OffsetWrapper.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/admin/OffsetWrapper.java
new file mode 100644
index 0000000..07785c2
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/admin/OffsetWrapper.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.common.admin;
+
+/**
+ *
+ * @author shijia.wxr
+ *
+ */
+public class OffsetWrapper {
+ private long brokerOffset;
+ private long consumerOffset;
+
+ private long lastTimestamp;
+
+
+ public long getBrokerOffset() {
+ return brokerOffset;
+ }
+
+
+ public void setBrokerOffset(long brokerOffset) {
+ this.brokerOffset = brokerOffset;
+ }
+
+
+ public long getConsumerOffset() {
+ return consumerOffset;
+ }
+
+
+ public void setConsumerOffset(long consumerOffset) {
+ this.consumerOffset = consumerOffset;
+ }
+
+
+ public long getLastTimestamp() {
+ return lastTimestamp;
+ }
+
+
+ public void setLastTimestamp(long lastTimestamp) {
+ this.lastTimestamp = lastTimestamp;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/admin/RollbackStats.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/admin/RollbackStats.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/admin/RollbackStats.java
new file mode 100644
index 0000000..03d94a2
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/admin/RollbackStats.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.rocketmq.common.admin;
+
+/**
+ *
+ * @author manhong.yqd
+ */
+public class RollbackStats {
+ private String brokerName;
+ private long queueId;
+ private long brokerOffset;
+ private long consumerOffset;
+ private long timestampOffset;
+ private long rollbackOffset;
+
+
+ public String getBrokerName() {
+ return brokerName;
+ }
+
+
+ public void setBrokerName(String brokerName) {
+ this.brokerName = brokerName;
+ }
+
+
+ public long getQueueId() {
+ return queueId;
+ }
+
+
+ public void setQueueId(long queueId) {
+ this.queueId = queueId;
+ }
+
+
+ public long getBrokerOffset() {
+ return brokerOffset;
+ }
+
+
+ public void setBrokerOffset(long brokerOffset) {
+ this.brokerOffset = brokerOffset;
+ }
+
+
+ public long getConsumerOffset() {
+ return consumerOffset;
+ }
+
+
+ public void setConsumerOffset(long consumerOffset) {
+ this.consumerOffset = consumerOffset;
+ }
+
+
+ public long getTimestampOffset() {
+ return timestampOffset;
+ }
+
+
+ public void setTimestampOffset(long timestampOffset) {
+ this.timestampOffset = timestampOffset;
+ }
+
+
+ public long getRollbackOffset() {
+ return rollbackOffset;
+ }
+
+
+ public void setRollbackOffset(long rollbackOffset) {
+ this.rollbackOffset = rollbackOffset;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/admin/TopicOffset.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/admin/TopicOffset.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/admin/TopicOffset.java
new file mode 100644
index 0000000..076d6eb
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/admin/TopicOffset.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.common.admin;
+
+/**
+ *
+ * @author shijia.wxr
+ *
+ */
+public class TopicOffset {
+ private long minOffset;
+ private long maxOffset;
+ private long lastUpdateTimestamp;
+
+
+ public long getMinOffset() {
+ return minOffset;
+ }
+
+
+ public void setMinOffset(long minOffset) {
+ this.minOffset = minOffset;
+ }
+
+
+ public long getMaxOffset() {
+ return maxOffset;
+ }
+
+
+ public void setMaxOffset(long maxOffset) {
+ this.maxOffset = maxOffset;
+ }
+
+
+ public long getLastUpdateTimestamp() {
+ return lastUpdateTimestamp;
+ }
+
+
+ public void setLastUpdateTimestamp(long lastUpdateTimestamp) {
+ this.lastUpdateTimestamp = lastUpdateTimestamp;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/admin/TopicStatsTable.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/admin/TopicStatsTable.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/admin/TopicStatsTable.java
new file mode 100644
index 0000000..12d1d4b
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/admin/TopicStatsTable.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.common.admin;
+
+import com.alibaba.rocketmq.common.message.MessageQueue;
+import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable;
+
+import java.util.HashMap;
+
+
+/**
+ *
+ * @author shijia.wxr
+ *
+ */
+public class TopicStatsTable extends RemotingSerializable {
+ private HashMap<MessageQueue, TopicOffset> offsetTable = new HashMap<MessageQueue, TopicOffset>();
+
+
+ public HashMap<MessageQueue, TopicOffset> getOffsetTable() {
+ return offsetTable;
+ }
+
+
+ public void setOffsetTable(HashMap<MessageQueue, TopicOffset> offsetTable) {
+ this.offsetTable = offsetTable;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/annotation/ImportantField.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/annotation/ImportantField.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/annotation/ImportantField.java
new file mode 100644
index 0000000..fe0cb12
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/annotation/ImportantField.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.common.annotation;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@Documented
+@Retention(RetentionPolicy.RUNTIME)
+@Target({ElementType.FIELD, ElementType.METHOD, ElementType.PARAMETER, ElementType.LOCAL_VARIABLE})
+public @interface ImportantField {
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/constant/DBMsgConstants.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/constant/DBMsgConstants.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/constant/DBMsgConstants.java
new file mode 100644
index 0000000..54bc04d
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/constant/DBMsgConstants.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.rocketmq.common.constant;
+
+public class DBMsgConstants {
+ public static final int MAX_BODY_SIZE = 64 * 1024 * 1204; //64KB
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/constant/LoggerName.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/constant/LoggerName.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/constant/LoggerName.java
new file mode 100644
index 0000000..9175669
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/constant/LoggerName.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.common.constant;
+
+/**
+ * @author shijia.wxr
+ */
+public class LoggerName {
+ public static final String FILTERSRV_LOGGER_NAME = "RocketmqFiltersrv";
+ public static final String NAMESRV_LOGGER_NAME = "RocketmqNamesrv";
+ public static final String BROKER_LOGGER_NAME = "RocketmqBroker";
+ public static final String CLIENT_LOGGER_NAME = "RocketmqClient";
+ public static final String TOOLS_LOGGER_NAME = "RocketmqTools";
+ public static final String COMMON_LOGGER_NAME = "RocketmqCommon";
+ public static final String STORE_LOGGER_NAME = "RocketmqStore";
+ public static final String STORE_ERROR_LOGGER_NAME = "RocketmqStoreError";
+ public static final String TRANSACTION_LOGGER_NAME = "RocketmqTransaction";
+ public static final String REBALANCE_LOCK_LOGGER_NAME = "RocketmqRebalanceLock";
+ public static final String ROCKETMQ_STATS_LOGGER_NAME = "RocketmqStats";
+ public static final String COMMERCIAL_LOGGER_NAME = "RocketmqCommercial";
+ public static final String FLOW_CONTROL_LOGGER_NAME = "RocketmqFlowControl";
+ public static final String ROCKETMQ_AUTHORIZE_LOGGER_NAME = "RocketmqAuthorize";
+ public static final String DUPLICATION_LOGGER_NAME = "RocketmqDuplication";
+ public static final String PROTECTION_LOGGER_NAME = "RocketmqProtection";
+ public static final String WATER_MARK_LOGGER_NAME = "RocketmqWaterMark";
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/constant/PermName.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/constant/PermName.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/constant/PermName.java
new file mode 100644
index 0000000..95c2510
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/constant/PermName.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.common.constant;
+
+/**
+ * @author shijia.wxr
+ */
+public class PermName {
+ public static final int PERM_PRIORITY = 0x1 << 3;
+ public static final int PERM_READ = 0x1 << 2;
+ public static final int PERM_WRITE = 0x1 << 1;
+ public static final int PERM_INHERIT = 0x1 << 0;
+
+ public static String perm2String(final int perm) {
+ final StringBuffer sb = new StringBuffer("---");
+ if (isReadable(perm)) {
+ sb.replace(0, 1, "R");
+ }
+
+ if (isWriteable(perm)) {
+ sb.replace(1, 2, "W");
+ }
+
+ if (isInherited(perm)) {
+ sb.replace(2, 3, "X");
+ }
+
+ return sb.toString();
+ }
+
+ public static boolean isReadable(final int perm) {
+ return (perm & PERM_READ) == PERM_READ;
+ }
+
+ public static boolean isWriteable(final int perm) {
+ return (perm & PERM_WRITE) == PERM_WRITE;
+ }
+
+ public static boolean isInherited(final int perm) {
+ return (perm & PERM_INHERIT) == PERM_INHERIT;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/consumer/ConsumeFromWhere.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/consumer/ConsumeFromWhere.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/consumer/ConsumeFromWhere.java
new file mode 100644
index 0000000..ededc90
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/consumer/ConsumeFromWhere.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.common.consumer;
+
+/**
+ *
+ * @author shijia.wxr
+ */
+public enum ConsumeFromWhere {
+ CONSUME_FROM_LAST_OFFSET,
+
+ @Deprecated
+ CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST,
+ @Deprecated
+ CONSUME_FROM_MIN_OFFSET,
+ @Deprecated
+ CONSUME_FROM_MAX_OFFSET,
+ CONSUME_FROM_FIRST_OFFSET,
+ CONSUME_FROM_TIMESTAMP,
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/filter/FilterAPI.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/filter/FilterAPI.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/filter/FilterAPI.java
new file mode 100644
index 0000000..2b26b83
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/filter/FilterAPI.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.common.filter;
+
+import com.alibaba.rocketmq.common.protocol.heartbeat.SubscriptionData;
+
+import java.net.URL;
+
+
+/**
+ * @author shijia.wxr
+ *
+ */
+public class FilterAPI {
+ public static URL classFile(final String className) {
+ final String javaSource = simpleClassName(className) + ".java";
+ URL url = FilterAPI.class.getClassLoader().getResource(javaSource);
+ return url;
+ }
+
+ public static String simpleClassName(final String className) {
+ String simple = className;
+ int index = className.lastIndexOf(".");
+ if (index >= 0) {
+ simple = className.substring(index + 1);
+ }
+
+ return simple;
+ }
+
+ public static SubscriptionData buildSubscriptionData(final String consumerGroup, String topic,
+ String subString) throws Exception {
+ SubscriptionData subscriptionData = new SubscriptionData();
+ subscriptionData.setTopic(topic);
+ subscriptionData.setSubString(subString);
+
+ if (null == subString || subString.equals(SubscriptionData.SUB_ALL) || subString.length() == 0) {
+ subscriptionData.setSubString(SubscriptionData.SUB_ALL);
+ } else {
+ String[] tags = subString.split("\\|\\|");
+ if (tags != null && tags.length > 0) {
+ for (String tag : tags) {
+ if (tag.length() > 0) {
+ String trimString = tag.trim();
+ if (trimString.length() > 0) {
+ subscriptionData.getTagsSet().add(trimString);
+ subscriptionData.getCodeSet().add(trimString.hashCode());
+ }
+ }
+ }
+ } else {
+ throw new Exception("subString split error");
+ }
+ }
+
+ return subscriptionData;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/filter/FilterContext.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/filter/FilterContext.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/filter/FilterContext.java
new file mode 100644
index 0000000..50cc5fc
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/filter/FilterContext.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.rocketmq.common.filter;
+
+public class FilterContext {
+ private String consumerGroup;
+
+
+ public String getConsumerGroup() {
+ return consumerGroup;
+ }
+
+
+ public void setConsumerGroup(String consumerGroup) {
+ this.consumerGroup = consumerGroup;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/filter/MessageFilter.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/filter/MessageFilter.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/filter/MessageFilter.java
new file mode 100644
index 0000000..8a1252e
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/filter/MessageFilter.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.rocketmq.common.filter;
+
+import com.alibaba.rocketmq.common.message.MessageExt;
+
+
+public interface MessageFilter {
+ boolean match(final MessageExt msg, final FilterContext context);
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/filter/impl/Op.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/filter/impl/Op.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/filter/impl/Op.java
new file mode 100644
index 0000000..f83a5f5
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/filter/impl/Op.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.rocketmq.common.filter.impl;
+
+public abstract class Op {
+
+ private String symbol;
+
+
+ protected Op(String symbol) {
+ this.symbol = symbol;
+ }
+
+
+ public String getSymbol() {
+ return symbol;
+ }
+
+
+ public String toString() {
+ return symbol;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/filter/impl/Operand.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/filter/impl/Operand.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/filter/impl/Operand.java
new file mode 100644
index 0000000..95ca663
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/filter/impl/Operand.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.rocketmq.common.filter.impl;
+
+public class Operand extends Op {
+
+ public Operand(String symbol) {
+ super(symbol);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/filter/impl/Operator.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/filter/impl/Operator.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/filter/impl/Operator.java
new file mode 100644
index 0000000..c906d72
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/filter/impl/Operator.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.rocketmq.common.filter.impl;
+
+public class Operator extends Op {
+
+ public static final Operator LEFTPARENTHESIS = new Operator("(", 30, false);
+ public static final Operator RIGHTPARENTHESIS = new Operator(")", 30, false);
+ public static final Operator AND = new Operator("&&", 20, true);
+ public static final Operator OR = new Operator("||", 15, true);
+
+ private int priority;
+ private boolean compareable;
+
+
+ private Operator(String symbol, int priority, boolean compareable) {
+ super(symbol);
+ this.priority = priority;
+ this.compareable = compareable;
+ }
+
+ public static Operator createOperator(String operator) {
+ if (LEFTPARENTHESIS.getSymbol().equals(operator))
+ return LEFTPARENTHESIS;
+ else if (RIGHTPARENTHESIS.getSymbol().equals(operator))
+ return RIGHTPARENTHESIS;
+ else if (AND.getSymbol().equals(operator))
+ return AND;
+ else if (OR.getSymbol().equals(operator))
+ return OR;
+ else
+ throw new IllegalArgumentException("unsupport operator " + operator);
+ }
+
+ public int getPriority() {
+ return priority;
+ }
+
+ public boolean isCompareable() {
+ return compareable;
+ }
+
+
+ public int compare(Operator operator) {
+ if (this.priority > operator.priority)
+ return 1;
+ else if (this.priority == operator.priority)
+ return 0;
+ else
+ return -1;
+ }
+
+ public boolean isSpecifiedOp(String operator) {
+ return this.getSymbol().equals(operator);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/filter/impl/PolishExpr.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/filter/impl/PolishExpr.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/filter/impl/PolishExpr.java
new file mode 100644
index 0000000..518c45e
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/filter/impl/PolishExpr.java
@@ -0,0 +1,189 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.rocketmq.common.filter.impl;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Stack;
+
+import static com.alibaba.rocketmq.common.filter.impl.Operator.LEFTPARENTHESIS;
+import static com.alibaba.rocketmq.common.filter.impl.Operator.RIGHTPARENTHESIS;
+import static com.alibaba.rocketmq.common.filter.impl.Operator.createOperator;
+
+public class PolishExpr {
+
+ public static List<Op> reversePolish(String expression) {
+ return reversePolish(participle(expression));
+ }
+
+ /**
+ * Shunting-yard algorithm <br/>
+ * http://en.wikipedia.org/wiki/Shunting_yard_algorithm
+ *
+ * @param tokens
+ * @return the compute result of Shunting-yard algorithm
+ */
+ public static List<Op> reversePolish(List<Op> tokens) {
+ List<Op> segments = new ArrayList<Op>();
+ Stack<Operator> operatorStack = new Stack<Operator>();
+
+ for (int i = 0; i < tokens.size(); i++) {
+ Op token = tokens.get(i);
+ if (isOperand(token)) {
+
+ segments.add(token);
+ } else if (isLeftParenthesis(token)) {
+
+ operatorStack.push((Operator) token);
+ } else if (isRightParenthesis(token)) {
+
+ Operator opNew = null;
+ while (!operatorStack.empty() && LEFTPARENTHESIS != (opNew = operatorStack.pop())) {
+ segments.add(opNew);
+ }
+ if (null == opNew || LEFTPARENTHESIS != opNew)
+ throw new IllegalArgumentException("mismatched parentheses");
+ } else if (isOperator(token)) {
+
+ Operator opNew = (Operator) token;
+ if (!operatorStack.empty()) {
+ Operator opOld = operatorStack.peek();
+ if (opOld.isCompareable() && opNew.compare(opOld) != 1) {
+ segments.add(operatorStack.pop());
+ }
+ }
+ operatorStack.push(opNew);
+ } else
+ throw new IllegalArgumentException("illegal token " + token);
+ }
+
+ while (!operatorStack.empty()) {
+ Operator operator = operatorStack.pop();
+ if (LEFTPARENTHESIS == operator || RIGHTPARENTHESIS == operator)
+ throw new IllegalArgumentException("mismatched parentheses " + operator);
+ segments.add(operator);
+ }
+
+ return segments;
+ }
+
+ /**
+ *
+ * @param expression
+ *
+ * @return
+ *
+ * @throws Exception
+ */
+ private static List<Op> participle(String expression) {
+ List<Op> segments = new ArrayList<Op>();
+
+ int size = expression.length();
+ int wordStartIndex = -1;
+ int wordLen = 0;
+ Type preType = Type.NULL;
+
+ for (int i = 0; i < size; i++) {
+ int chValue = (int) expression.charAt(i);
+
+ if ((97 <= chValue && chValue <= 122) || (65 <= chValue && chValue <= 90)
+ || (49 <= chValue && chValue <= 57) || 95 == chValue) {
+
+
+ if (Type.OPERATOR == preType || Type.SEPAERATOR == preType || Type.NULL == preType
+ || Type.PARENTHESIS == preType) {
+ if (Type.OPERATOR == preType) {
+ segments.add(createOperator(expression.substring(wordStartIndex, wordStartIndex
+ + wordLen)));
+ }
+ wordStartIndex = i;
+ wordLen = 0;
+ }
+ preType = Type.OPERAND;
+ wordLen++;
+ } else if (40 == chValue || 41 == chValue) {
+
+
+ if (Type.OPERATOR == preType) {
+ segments.add(createOperator(expression
+ .substring(wordStartIndex, wordStartIndex + wordLen)));
+ wordStartIndex = -1;
+ wordLen = 0;
+ } else if (Type.OPERAND == preType) {
+ segments.add(new Operand(expression.substring(wordStartIndex, wordStartIndex + wordLen)));
+ wordStartIndex = -1;
+ wordLen = 0;
+ }
+
+ preType = Type.PARENTHESIS;
+ segments.add(createOperator((char) chValue + ""));
+ } else if (38 == chValue || 124 == chValue) {
+
+ if (Type.OPERAND == preType || Type.SEPAERATOR == preType || Type.PARENTHESIS == preType) {
+ if (Type.OPERAND == preType) {
+ segments.add(new Operand(expression.substring(wordStartIndex, wordStartIndex
+ + wordLen)));
+ }
+ wordStartIndex = i;
+ wordLen = 0;
+ }
+ preType = Type.OPERATOR;
+ wordLen++;
+ } else if (32 == chValue || 9 == chValue) {
+
+
+ if (Type.OPERATOR == preType) {
+ segments.add(createOperator(expression
+ .substring(wordStartIndex, wordStartIndex + wordLen)));
+ wordStartIndex = -1;
+ wordLen = 0;
+ } else if (Type.OPERAND == preType) {
+ segments.add(new Operand(expression.substring(wordStartIndex, wordStartIndex + wordLen)));
+ wordStartIndex = -1;
+ wordLen = 0;
+ }
+ preType = Type.SEPAERATOR;
+ } else {
+
+ throw new IllegalArgumentException("illegal expression, at index " + i + " " + (char) chValue);
+ }
+
+ }
+
+ if (wordLen > 0) {
+ segments.add(new Operand(expression.substring(wordStartIndex, wordStartIndex + wordLen)));
+ }
+ return segments;
+ }
+
+ public static boolean isOperand(Op token) {
+ return token instanceof Operand;
+ }
+
+ public static boolean isLeftParenthesis(Op token) {
+ return token instanceof Operator && LEFTPARENTHESIS == (Operator) token;
+ }
+
+ public static boolean isRightParenthesis(Op token) {
+ return token instanceof Operator && RIGHTPARENTHESIS == (Operator) token;
+ }
+
+ public static boolean isOperator(Op token) {
+ return token instanceof Operator;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/filter/impl/Type.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/filter/impl/Type.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/filter/impl/Type.java
new file mode 100644
index 0000000..1c0b343
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/filter/impl/Type.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.rocketmq.common.filter.impl;
+
+public enum Type {
+ NULL,
+ OPERAND,
+ OPERATOR,
+ PARENTHESIS,
+ SEPAERATOR;
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/help/FAQUrl.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/help/FAQUrl.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/help/FAQUrl.java
new file mode 100644
index 0000000..06a74a6
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/help/FAQUrl.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.common.help;
+
+/**
+ * @author shijia.wxr
+ */
+public class FAQUrl {
+
+ public static final String APPLY_TOPIC_URL = //
+ "http://docs.aliyun.com/cn#/pub/ons/faq/exceptions&topic_not_exist";
+
+
+ public static final String NAME_SERVER_ADDR_NOT_EXIST_URL = //
+ "http://docs.aliyun.com/cn#/pub/ons/faq/exceptions&namesrv_not_exist";
+
+
+ public static final String GROUP_NAME_DUPLICATE_URL = //
+ "http://docs.aliyun.com/cn#/pub/ons/faq/exceptions&group_duplicate";
+
+
+ public static final String CLIENT_PARAMETER_CHECK_URL = //
+ "http://docs.aliyun.com/cn#/pub/ons/faq/exceptions¶meter_check_failed";
+
+
+ public static final String SUBSCRIPTION_GROUP_NOT_EXIST = //
+ "http://docs.aliyun.com/cn#/pub/ons/faq/exceptions&subGroup_not_exist";
+
+
+ public static final String CLIENT_SERVICE_NOT_OK = //
+ "http://docs.aliyun.com/cn#/pub/ons/faq/exceptions&service_not_ok";
+
+ // FAQ: No route info of this topic, TopicABC
+ public static final String NO_TOPIC_ROUTE_INFO = //
+ "http://docs.aliyun.com/cn#/pub/ons/faq/exceptions&topic_not_exist";
+
+
+ public static final String LOAD_JSON_EXCEPTION = //
+ "http://docs.aliyun.com/cn#/pub/ons/faq/exceptions&load_json_exception";
+
+
+ public static final String SAME_GROUP_DIFFERENT_TOPIC = //
+ "http://docs.aliyun.com/cn#/pub/ons/faq/exceptions&subscription_exception";
+
+
+ public static final String MQLIST_NOT_EXIST = //
+ "http://docs.aliyun.com/cn#/pub/ons/faq/exceptions&queue_not_exist";
+
+ public static final String UNEXPECTED_EXCEPTION_URL = //
+ "http://docs.aliyun.com/cn#/pub/ons/faq/exceptions&unexpected_exception";
+
+
+ public static final String SEND_MSG_FAILED = //
+ "http://docs.aliyun.com/cn#/pub/ons/faq/exceptions&send_msg_failed";
+
+
+ public static final String UNKNOWN_HOST_EXCEPTION = //
+ "http://docs.aliyun.com/cn#/pub/ons/faq/exceptions&unknown_host";
+
+ private static final String TIP_STRING_BEGIN = "\nSee ";
+ private static final String TIP_STRING_END = " for further details.";
+
+
+ public static String suggestTodo(final String url) {
+ StringBuilder sb = new StringBuilder();
+ sb.append(TIP_STRING_BEGIN);
+ sb.append(url);
+ sb.append(TIP_STRING_END);
+ return sb.toString();
+ }
+
+ public static String attachDefaultURL(final String errorMessage) {
+ if (errorMessage != null) {
+ int index = errorMessage.indexOf(TIP_STRING_BEGIN);
+ if (-1 == index) {
+ StringBuilder sb = new StringBuilder();
+ sb.append(errorMessage);
+ sb.append("\n");
+ sb.append("For more information, please visit the url, ");
+ sb.append(UNEXPECTED_EXCEPTION_URL);
+ return sb.toString();
+ }
+ }
+
+ return errorMessage;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/hook/FilterCheckHook.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/hook/FilterCheckHook.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/hook/FilterCheckHook.java
new file mode 100644
index 0000000..f5d9d7e
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/hook/FilterCheckHook.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.rocketmq.common.hook;
+
+import java.nio.ByteBuffer;
+
+
+/**
+ *
+ * @author manhong.yqd
+ *
+ */
+public interface FilterCheckHook {
+ public String hookName();
+
+
+ public boolean isFilterMatched(final boolean isUnitMode, final ByteBuffer byteBuffer);
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/message/Message.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/message/Message.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/message/Message.java
new file mode 100644
index 0000000..eeb6f52
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/message/Message.java
@@ -0,0 +1,212 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.common.message;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+
+/**
+ *
+ * @author shijia.wxr
+ *
+ */
+public class Message implements Serializable {
+ private static final long serialVersionUID = 8445773977080406428L;
+
+ private String topic;
+ private int flag;
+ private Map<String, String> properties;
+ private byte[] body;
+
+
+ public Message() {
+ }
+
+
+ public Message(String topic, byte[] body) {
+ this(topic, "", "", 0, body, true);
+ }
+
+
+ public Message(String topic, String tags, String keys, int flag, byte[] body, boolean waitStoreMsgOK) {
+ this.topic = topic;
+ this.flag = flag;
+ this.body = body;
+
+ if (tags != null && tags.length() > 0)
+ this.setTags(tags);
+
+ if (keys != null && keys.length() > 0)
+ this.setKeys(keys);
+
+ this.setWaitStoreMsgOK(waitStoreMsgOK);
+ }
+
+ public void setKeys(String keys) {
+ this.putProperty(MessageConst.PROPERTY_KEYS, keys);
+ }
+
+ void putProperty(final String name, final String value) {
+ if (null == this.properties) {
+ this.properties = new HashMap<String, String>();
+ }
+
+ this.properties.put(name, value);
+ }
+
+
+ public Message(String topic, String tags, byte[] body) {
+ this(topic, tags, "", 0, body, true);
+ }
+
+
+ public Message(String topic, String tags, String keys, byte[] body) {
+ this(topic, tags, keys, 0, body, true);
+ }
+
+ void clearProperty(final String name) {
+ if (null != this.properties) {
+ this.properties.remove(name);
+ }
+ }
+
+ public void putUserProperty(final String name, final String value) {
+ if (MessageConst.STRING_HASH_SET.contains(name)) {
+ throw new RuntimeException(String.format(
+ "The Property<%s> is used by system, input another please", name));
+ }
+ this.putProperty(name, value);
+ }
+
+ public String getUserProperty(final String name) {
+ return this.getProperty(name);
+ }
+
+ public String getProperty(final String name) {
+ if (null == this.properties) {
+ this.properties = new HashMap<String, String>();
+ }
+
+ return this.properties.get(name);
+ }
+
+ public String getTopic() {
+ return topic;
+ }
+
+ public void setTopic(String topic) {
+ this.topic = topic;
+ }
+
+ public String getTags() {
+ return this.getProperty(MessageConst.PROPERTY_TAGS);
+ }
+
+ public void setTags(String tags) {
+ this.putProperty(MessageConst.PROPERTY_TAGS, tags);
+ }
+
+ public String getKeys() {
+ return this.getProperty(MessageConst.PROPERTY_KEYS);
+ }
+
+ public void setKeys(Collection<String> keys) {
+ StringBuffer sb = new StringBuffer();
+ for (String k : keys) {
+ sb.append(k);
+ sb.append(MessageConst.KEY_SEPARATOR);
+ }
+
+ this.setKeys(sb.toString().trim());
+ }
+
+
+ public int getDelayTimeLevel() {
+ String t = this.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL);
+ if (t != null) {
+ return Integer.parseInt(t);
+ }
+
+ return 0;
+ }
+
+
+ public void setDelayTimeLevel(int level) {
+ this.putProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL, String.valueOf(level));
+ }
+
+
+ public boolean isWaitStoreMsgOK() {
+ String result = this.getProperty(MessageConst.PROPERTY_WAIT_STORE_MSG_OK);
+ if (null == result)
+ return true;
+
+ return Boolean.parseBoolean(result);
+ }
+
+
+ public void setWaitStoreMsgOK(boolean waitStoreMsgOK) {
+ this.putProperty(MessageConst.PROPERTY_WAIT_STORE_MSG_OK, Boolean.toString(waitStoreMsgOK));
+ }
+
+
+ public int getFlag() {
+ return flag;
+ }
+
+
+ public void setFlag(int flag) {
+ this.flag = flag;
+ }
+
+
+ public byte[] getBody() {
+ return body;
+ }
+
+
+ public void setBody(byte[] body) {
+ this.body = body;
+ }
+
+
+ public Map<String, String> getProperties() {
+ return properties;
+ }
+
+
+ void setProperties(Map<String, String> properties) {
+ this.properties = properties;
+ }
+
+ public String getBuyerId() {
+ return getProperty(MessageConst.PROPERTY_BUYER_ID);
+ }
+
+ public void setBuyerId(String buyerId) {
+ putProperty(MessageConst.PROPERTY_BUYER_ID, buyerId);
+ }
+
+ @Override
+ public String toString() {
+ return "Message [topic=" + topic + ", flag=" + flag + ", properties=" + properties + ", body="
+ + (body != null ? body.length : 0) + "]";
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/message/MessageAccessor.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/message/MessageAccessor.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/message/MessageAccessor.java
new file mode 100644
index 0000000..bbbca1a
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/message/MessageAccessor.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.rocketmq.common.message;
+
+import java.util.Map;
+
+
+public class MessageAccessor {
+
+ public static void clearProperty(final Message msg, final String name) {
+ msg.clearProperty(name);
+ }
+
+ public static void setProperties(final Message msg, Map<String, String> properties) {
+ msg.setProperties(properties);
+ }
+
+ public static void setTransferFlag(final Message msg, String unit) {
+ putProperty(msg, MessageConst.PROPERTY_TRANSFER_FLAG, unit);
+ }
+
+ public static void putProperty(final Message msg, final String name, final String value) {
+ msg.putProperty(name, value);
+ }
+
+ public static String getTransferFlag(final Message msg) {
+ return msg.getProperty(MessageConst.PROPERTY_TRANSFER_FLAG);
+ }
+
+
+ public static void setCorrectionFlag(final Message msg, String unit) {
+ putProperty(msg, MessageConst.PROPERTY_CORRECTION_FLAG, unit);
+ }
+
+
+ public static String getCorrectionFlag(final Message msg) {
+ return msg.getProperty(MessageConst.PROPERTY_CORRECTION_FLAG);
+ }
+
+
+ public static void setOriginMessageId(final Message msg, String originMessageId) {
+ putProperty(msg, MessageConst.PROPERTY_ORIGIN_MESSAGE_ID, originMessageId);
+ }
+
+
+ public static String getOriginMessageId(final Message msg) {
+ return msg.getProperty(MessageConst.PROPERTY_ORIGIN_MESSAGE_ID);
+ }
+
+
+ public static void setMQ2Flag(final Message msg, String flag) {
+ putProperty(msg, MessageConst.PROPERTY_MQ2_FLAG, flag);
+ }
+
+
+ public static String getMQ2Flag(final Message msg) {
+ return msg.getProperty(MessageConst.PROPERTY_MQ2_FLAG);
+ }
+
+
+ public static void setReconsumeTime(final Message msg, String reconsumeTimes) {
+ putProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME, reconsumeTimes);
+ }
+
+
+ public static String getReconsumeTime(final Message msg) {
+ return msg.getProperty(MessageConst.PROPERTY_RECONSUME_TIME);
+ }
+
+
+ public static void setMaxReconsumeTimes(final Message msg, String maxReconsumeTimes) {
+ putProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES, maxReconsumeTimes);
+ }
+
+
+ public static String getMaxReconsumeTimes(final Message msg) {
+ return msg.getProperty(MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
+ }
+
+ public static void setConsumeStartTimeStamp(final Message msg, String propertyConsumeStartTimeStamp) {
+ putProperty(msg, MessageConst.PROPERTY_CONSUME_START_TIMESTAMP, propertyConsumeStartTimeStamp);
+ }
+
+
+ public static String getConsumeStartTimeStamp(final Message msg) {
+ return msg.getProperty(MessageConst.PROPERTY_CONSUME_START_TIMESTAMP);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/message/MessageClientExt.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/message/MessageClientExt.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/message/MessageClientExt.java
new file mode 100644
index 0000000..0ab372e
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/message/MessageClientExt.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.common.message;
+
+public class MessageClientExt extends MessageExt {
+
+ public void setOffsetMsgId(String offsetMsgId) {
+ super.setMsgId(offsetMsgId);
+ }
+
+
+ public String getOffsetMsgId() {
+ return super.getMsgId();
+ }
+
+ public void setMsgId(String msgId) {
+ //DO NOTHING
+ //MessageClientIDSetter.setUniqID(this);
+ }
+
+ @Override
+ public String getMsgId() {
+ String uniqID = MessageClientIDSetter.getUniqID(this);
+ if (uniqID == null) {
+ return this.getOffsetMsgId();
+ }
+ else {
+ return uniqID;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/message/MessageClientIDSetter.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/message/MessageClientIDSetter.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/message/MessageClientIDSetter.java
new file mode 100644
index 0000000..82cd3d1
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/message/MessageClientIDSetter.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.common.message;
+
+import com.alibaba.rocketmq.common.UtilAll;
+
+import java.nio.ByteBuffer;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class MessageClientIDSetter {
+ private static final String TOPIC_KEY_SPLITTER = "#";
+ private static final int LEN;
+ private static final String FIX_STRING;
+ private static final AtomicInteger COUNTER;
+ private static long startTime;
+ private static long nextStartTime;
+
+ static {
+ LEN = 4 + 2 + 4 + 4 + 2;
+ ByteBuffer tempBuffer = ByteBuffer.allocate(10);
+ tempBuffer.position(2);
+ tempBuffer.putInt(UtilAll.getPid());
+ tempBuffer.position(0);
+ try {
+ tempBuffer.put(UtilAll.getIP());
+ } catch (Exception e) {
+ tempBuffer.put(createFakeIP());
+ }
+ tempBuffer.position(6);
+ tempBuffer.putInt(MessageClientIDSetter.class.getClassLoader().hashCode()); //4
+ FIX_STRING = UtilAll.bytes2string(tempBuffer.array());
+ setStartTime(System.currentTimeMillis());
+ COUNTER = new AtomicInteger(0);
+ }
+
+ private synchronized static void setStartTime(long millis) {
+ Calendar cal = Calendar.getInstance();
+ cal.setTimeInMillis(millis);
+ cal.set(Calendar.DAY_OF_MONTH, 1);
+ cal.set(Calendar.HOUR, 0);
+ cal.set(Calendar.MINUTE, 0);
+ cal.set(Calendar.SECOND, 0);
+ cal.set(Calendar.MILLISECOND, 0);
+ startTime = cal.getTimeInMillis();
+ cal.add(Calendar.MONTH, 1);
+ nextStartTime = cal.getTimeInMillis();
+ }
+
+ public static Date getNearlyTimeFromID(String msgID) {
+ ByteBuffer buf = ByteBuffer.allocate(8);
+ byte[] bytes = UtilAll.string2bytes(msgID);
+ buf.put((byte) 0);
+ buf.put((byte) 0);
+ buf.put((byte) 0);
+ buf.put((byte) 0);
+ buf.put(bytes, 10, 4);
+ buf.position(0);
+ long spanMS = buf.getLong();
+ Calendar cal = Calendar.getInstance();
+ long now = cal.getTimeInMillis();
+ cal.set(Calendar.DAY_OF_MONTH, 1);
+ cal.set(Calendar.HOUR, 0);
+ cal.set(Calendar.MINUTE, 0);
+ cal.set(Calendar.SECOND, 0);
+ cal.set(Calendar.MILLISECOND, 0);
+ long monStartTime = cal.getTimeInMillis();
+ if (monStartTime + spanMS >= now) {
+ cal.add(Calendar.MONTH, -1);
+ monStartTime = cal.getTimeInMillis();
+ }
+ cal.setTimeInMillis(monStartTime + spanMS);
+ return cal.getTime();
+ }
+
+ public static String getIPStrFromID(String msgID) {
+ byte[] ipBytes = getIPFromID(msgID);
+ return UtilAll.ipToIPv4Str(ipBytes);
+ }
+
+ public static byte[] getIPFromID(String msgID) {
+ byte[] result = new byte[4];
+ byte[] bytes = UtilAll.string2bytes(msgID);
+ System.arraycopy(bytes, 0, result, 0, 4);
+ return result;
+ }
+
+ public static String createUniqID() {
+ StringBuilder sb = new StringBuilder(LEN * 2);
+ sb.append(FIX_STRING);
+ sb.append(UtilAll.bytes2string(createUniqIDBuffer()));
+ return sb.toString();
+ }
+
+
+ private static byte[] createUniqIDBuffer() {
+ ByteBuffer buffer = ByteBuffer.allocate(4 + 2);
+ long current = System.currentTimeMillis();
+ if (current >= nextStartTime) {
+ setStartTime(current);
+ }
+ buffer.position(0);
+ buffer.putInt((int) (System.currentTimeMillis() - startTime));
+ buffer.putShort((short) COUNTER.getAndIncrement());
+ return buffer.array();
+ }
+
+ public static void setUniqID(final Message msg) {
+ if (msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX) == null) {
+ msg.putProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, createUniqID());
+ }
+ }
+
+ public static String getUniqID(final Message msg) {
+ return msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
+ }
+
+ public static byte[] createFakeIP() {
+ ByteBuffer bb = ByteBuffer.allocate(8);
+ bb.putLong(System.currentTimeMillis());
+ bb.position(4);
+ byte[] fakeIP = new byte[4];
+ bb.get(fakeIP);
+ return fakeIP;
+ }
+}
+