You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2016/12/28 02:44:07 UTC
[11/58] [abbrv] [partial] incubator-rocketmq git commit: ROCKETMQ-18
Rename package name from com.alibaba to org.apache
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/admin/ConsumeStats.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/admin/ConsumeStats.java b/common/src/main/java/org/apache/rocketmq/common/admin/ConsumeStats.java
new file mode 100644
index 0000000..1405299
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/admin/ConsumeStats.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.admin;
+
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map.Entry;
+
+
+/**
+ *
+ * @author shijia.wxr
+ *
+ */
+public class ConsumeStats extends RemotingSerializable {
+ private HashMap<MessageQueue, OffsetWrapper> offsetTable = new HashMap<MessageQueue, OffsetWrapper>();
+ private double consumeTps = 0;
+
+
+ public long computeTotalDiff() {
+ long diffTotal = 0L;
+
+ Iterator<Entry<MessageQueue, OffsetWrapper>> it = this.offsetTable.entrySet().iterator();
+ while (it.hasNext()) {
+ Entry<MessageQueue, OffsetWrapper> next = it.next();
+ long diff = next.getValue().getBrokerOffset() - next.getValue().getConsumerOffset();
+ diffTotal += diff;
+ }
+
+ return diffTotal;
+ }
+
+
+ public HashMap<MessageQueue, OffsetWrapper> getOffsetTable() {
+ return offsetTable;
+ }
+
+
+ public void setOffsetTable(HashMap<MessageQueue, OffsetWrapper> offsetTable) {
+ this.offsetTable = offsetTable;
+ }
+
+ public double getConsumeTps() {
+ return consumeTps;
+ }
+
+ public void setConsumeTps(double consumeTps) {
+ this.consumeTps = consumeTps;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/admin/OffsetWrapper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/admin/OffsetWrapper.java b/common/src/main/java/org/apache/rocketmq/common/admin/OffsetWrapper.java
new file mode 100644
index 0000000..00bab0e
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/admin/OffsetWrapper.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.admin;
+
+/**
+ *
+ * @author shijia.wxr
+ *
+ */
+public class OffsetWrapper {
+ private long brokerOffset;
+ private long consumerOffset;
+
+ private long lastTimestamp;
+
+
+ public long getBrokerOffset() {
+ return brokerOffset;
+ }
+
+
+ public void setBrokerOffset(long brokerOffset) {
+ this.brokerOffset = brokerOffset;
+ }
+
+
+ public long getConsumerOffset() {
+ return consumerOffset;
+ }
+
+
+ public void setConsumerOffset(long consumerOffset) {
+ this.consumerOffset = consumerOffset;
+ }
+
+
+ public long getLastTimestamp() {
+ return lastTimestamp;
+ }
+
+
+ public void setLastTimestamp(long lastTimestamp) {
+ this.lastTimestamp = lastTimestamp;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/admin/RollbackStats.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/admin/RollbackStats.java b/common/src/main/java/org/apache/rocketmq/common/admin/RollbackStats.java
new file mode 100644
index 0000000..5709327
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/admin/RollbackStats.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.admin;
+
+/**
+ *
+ * @author manhong.yqd
+ */
+public class RollbackStats {
+ private String brokerName;
+ private long queueId;
+ private long brokerOffset;
+ private long consumerOffset;
+ private long timestampOffset;
+ private long rollbackOffset;
+
+
+ public String getBrokerName() {
+ return brokerName;
+ }
+
+
+ public void setBrokerName(String brokerName) {
+ this.brokerName = brokerName;
+ }
+
+
+ public long getQueueId() {
+ return queueId;
+ }
+
+
+ public void setQueueId(long queueId) {
+ this.queueId = queueId;
+ }
+
+
+ public long getBrokerOffset() {
+ return brokerOffset;
+ }
+
+
+ public void setBrokerOffset(long brokerOffset) {
+ this.brokerOffset = brokerOffset;
+ }
+
+
+ public long getConsumerOffset() {
+ return consumerOffset;
+ }
+
+
+ public void setConsumerOffset(long consumerOffset) {
+ this.consumerOffset = consumerOffset;
+ }
+
+
+ public long getTimestampOffset() {
+ return timestampOffset;
+ }
+
+
+ public void setTimestampOffset(long timestampOffset) {
+ this.timestampOffset = timestampOffset;
+ }
+
+
+ public long getRollbackOffset() {
+ return rollbackOffset;
+ }
+
+
+ public void setRollbackOffset(long rollbackOffset) {
+ this.rollbackOffset = rollbackOffset;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/admin/TopicOffset.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/admin/TopicOffset.java b/common/src/main/java/org/apache/rocketmq/common/admin/TopicOffset.java
new file mode 100644
index 0000000..d1b36a5
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/admin/TopicOffset.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.admin;
+
+/**
+ *
+ * @author shijia.wxr
+ *
+ */
+public class TopicOffset {
+ private long minOffset;
+ private long maxOffset;
+ private long lastUpdateTimestamp;
+
+
+ public long getMinOffset() {
+ return minOffset;
+ }
+
+
+ public void setMinOffset(long minOffset) {
+ this.minOffset = minOffset;
+ }
+
+
+ public long getMaxOffset() {
+ return maxOffset;
+ }
+
+
+ public void setMaxOffset(long maxOffset) {
+ this.maxOffset = maxOffset;
+ }
+
+
+ public long getLastUpdateTimestamp() {
+ return lastUpdateTimestamp;
+ }
+
+
+ public void setLastUpdateTimestamp(long lastUpdateTimestamp) {
+ this.lastUpdateTimestamp = lastUpdateTimestamp;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/admin/TopicStatsTable.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/admin/TopicStatsTable.java b/common/src/main/java/org/apache/rocketmq/common/admin/TopicStatsTable.java
new file mode 100644
index 0000000..d8f7e0a
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/admin/TopicStatsTable.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.admin;
+
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+
+import java.util.HashMap;
+
+
+/**
+ *
+ * @author shijia.wxr
+ *
+ */
+public class TopicStatsTable extends RemotingSerializable {
+ private HashMap<MessageQueue, TopicOffset> offsetTable = new HashMap<MessageQueue, TopicOffset>();
+
+
+ public HashMap<MessageQueue, TopicOffset> getOffsetTable() {
+ return offsetTable;
+ }
+
+
+ public void setOffsetTable(HashMap<MessageQueue, TopicOffset> offsetTable) {
+ this.offsetTable = offsetTable;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/annotation/ImportantField.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/annotation/ImportantField.java b/common/src/main/java/org/apache/rocketmq/common/annotation/ImportantField.java
new file mode 100644
index 0000000..952e08e
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/annotation/ImportantField.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.annotation;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@Documented
+@Retention(RetentionPolicy.RUNTIME)
+@Target({ElementType.FIELD, ElementType.METHOD, ElementType.PARAMETER, ElementType.LOCAL_VARIABLE})
+public @interface ImportantField {
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/constant/DBMsgConstants.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/constant/DBMsgConstants.java b/common/src/main/java/org/apache/rocketmq/common/constant/DBMsgConstants.java
new file mode 100644
index 0000000..298a427
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/constant/DBMsgConstants.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.constant;
+
+public class DBMsgConstants {
+ public static final int MAX_BODY_SIZE = 64 * 1024 * 1204; //64KB
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/constant/LoggerName.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/constant/LoggerName.java b/common/src/main/java/org/apache/rocketmq/common/constant/LoggerName.java
new file mode 100644
index 0000000..1942dc8
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/constant/LoggerName.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.constant;
+
+/**
+ * @author shijia.wxr
+ */
+public class LoggerName {
+ public static final String FILTERSRV_LOGGER_NAME = "RocketmqFiltersrv";
+ public static final String NAMESRV_LOGGER_NAME = "RocketmqNamesrv";
+ public static final String BROKER_LOGGER_NAME = "RocketmqBroker";
+ public static final String CLIENT_LOGGER_NAME = "RocketmqClient";
+ public static final String TOOLS_LOGGER_NAME = "RocketmqTools";
+ public static final String COMMON_LOGGER_NAME = "RocketmqCommon";
+ public static final String STORE_LOGGER_NAME = "RocketmqStore";
+ public static final String STORE_ERROR_LOGGER_NAME = "RocketmqStoreError";
+ public static final String TRANSACTION_LOGGER_NAME = "RocketmqTransaction";
+ public static final String REBALANCE_LOCK_LOGGER_NAME = "RocketmqRebalanceLock";
+ public static final String ROCKETMQ_STATS_LOGGER_NAME = "RocketmqStats";
+ public static final String COMMERCIAL_LOGGER_NAME = "RocketmqCommercial";
+ public static final String FLOW_CONTROL_LOGGER_NAME = "RocketmqFlowControl";
+ public static final String ROCKETMQ_AUTHORIZE_LOGGER_NAME = "RocketmqAuthorize";
+ public static final String DUPLICATION_LOGGER_NAME = "RocketmqDuplication";
+ public static final String PROTECTION_LOGGER_NAME = "RocketmqProtection";
+ public static final String WATER_MARK_LOGGER_NAME = "RocketmqWaterMark";
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/constant/PermName.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/constant/PermName.java b/common/src/main/java/org/apache/rocketmq/common/constant/PermName.java
new file mode 100644
index 0000000..ed379ec
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/constant/PermName.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.constant;
+
+/**
+ * @author shijia.wxr
+ */
+public class PermName {
+ public static final int PERM_PRIORITY = 0x1 << 3;
+ public static final int PERM_READ = 0x1 << 2;
+ public static final int PERM_WRITE = 0x1 << 1;
+ public static final int PERM_INHERIT = 0x1 << 0;
+
+ public static String perm2String(final int perm) {
+ final StringBuffer sb = new StringBuffer("---");
+ if (isReadable(perm)) {
+ sb.replace(0, 1, "R");
+ }
+
+ if (isWriteable(perm)) {
+ sb.replace(1, 2, "W");
+ }
+
+ if (isInherited(perm)) {
+ sb.replace(2, 3, "X");
+ }
+
+ return sb.toString();
+ }
+
+ public static boolean isReadable(final int perm) {
+ return (perm & PERM_READ) == PERM_READ;
+ }
+
+ public static boolean isWriteable(final int perm) {
+ return (perm & PERM_WRITE) == PERM_WRITE;
+ }
+
+ public static boolean isInherited(final int perm) {
+ return (perm & PERM_INHERIT) == PERM_INHERIT;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/consumer/ConsumeFromWhere.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/consumer/ConsumeFromWhere.java b/common/src/main/java/org/apache/rocketmq/common/consumer/ConsumeFromWhere.java
new file mode 100644
index 0000000..db093a0
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/consumer/ConsumeFromWhere.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.consumer;
+
+/**
+ *
+ * @author shijia.wxr
+ */
+public enum ConsumeFromWhere {
+ CONSUME_FROM_LAST_OFFSET,
+
+ @Deprecated
+ CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST,
+ @Deprecated
+ CONSUME_FROM_MIN_OFFSET,
+ @Deprecated
+ CONSUME_FROM_MAX_OFFSET,
+ CONSUME_FROM_FIRST_OFFSET,
+ CONSUME_FROM_TIMESTAMP,
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/filter/FilterAPI.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/filter/FilterAPI.java b/common/src/main/java/org/apache/rocketmq/common/filter/FilterAPI.java
new file mode 100644
index 0000000..fac48ea
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/filter/FilterAPI.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.filter;
+
+import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
+
+import java.net.URL;
+
+
+/**
+ * @author shijia.wxr
+ *
+ */
+public class FilterAPI {
+ public static URL classFile(final String className) {
+ final String javaSource = simpleClassName(className) + ".java";
+ URL url = FilterAPI.class.getClassLoader().getResource(javaSource);
+ return url;
+ }
+
+ public static String simpleClassName(final String className) {
+ String simple = className;
+ int index = className.lastIndexOf(".");
+ if (index >= 0) {
+ simple = className.substring(index + 1);
+ }
+
+ return simple;
+ }
+
+ public static SubscriptionData buildSubscriptionData(final String consumerGroup, String topic,
+ String subString) throws Exception {
+ SubscriptionData subscriptionData = new SubscriptionData();
+ subscriptionData.setTopic(topic);
+ subscriptionData.setSubString(subString);
+
+ if (null == subString || subString.equals(SubscriptionData.SUB_ALL) || subString.length() == 0) {
+ subscriptionData.setSubString(SubscriptionData.SUB_ALL);
+ } else {
+ String[] tags = subString.split("\\|\\|");
+ if (tags != null && tags.length > 0) {
+ for (String tag : tags) {
+ if (tag.length() > 0) {
+ String trimString = tag.trim();
+ if (trimString.length() > 0) {
+ subscriptionData.getTagsSet().add(trimString);
+ subscriptionData.getCodeSet().add(trimString.hashCode());
+ }
+ }
+ }
+ } else {
+ throw new Exception("subString split error");
+ }
+ }
+
+ return subscriptionData;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/filter/FilterContext.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/filter/FilterContext.java b/common/src/main/java/org/apache/rocketmq/common/filter/FilterContext.java
new file mode 100644
index 0000000..e18fe48
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/filter/FilterContext.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.filter;
+
+public class FilterContext {
+ private String consumerGroup;
+
+
+ public String getConsumerGroup() {
+ return consumerGroup;
+ }
+
+
+ public void setConsumerGroup(String consumerGroup) {
+ this.consumerGroup = consumerGroup;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/filter/MessageFilter.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/filter/MessageFilter.java b/common/src/main/java/org/apache/rocketmq/common/filter/MessageFilter.java
new file mode 100644
index 0000000..c20e737
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/filter/MessageFilter.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.filter;
+
+import org.apache.rocketmq.common.message.MessageExt;
+
+
+public interface MessageFilter {
+ boolean match(final MessageExt msg, final FilterContext context);
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/filter/impl/Op.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/filter/impl/Op.java b/common/src/main/java/org/apache/rocketmq/common/filter/impl/Op.java
new file mode 100644
index 0000000..af54566
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/filter/impl/Op.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.filter.impl;
+
+public abstract class Op {
+
+ private String symbol;
+
+
+ protected Op(String symbol) {
+ this.symbol = symbol;
+ }
+
+
+ public String getSymbol() {
+ return symbol;
+ }
+
+
+ public String toString() {
+ return symbol;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/filter/impl/Operand.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/filter/impl/Operand.java b/common/src/main/java/org/apache/rocketmq/common/filter/impl/Operand.java
new file mode 100644
index 0000000..ce21d90
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/filter/impl/Operand.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.filter.impl;
+
+public class Operand extends Op {
+
+ public Operand(String symbol) {
+ super(symbol);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/filter/impl/Operator.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/filter/impl/Operator.java b/common/src/main/java/org/apache/rocketmq/common/filter/impl/Operator.java
new file mode 100644
index 0000000..45bebf0
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/filter/impl/Operator.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.filter.impl;
+
+public class Operator extends Op {
+
+ public static final Operator LEFTPARENTHESIS = new Operator("(", 30, false);
+ public static final Operator RIGHTPARENTHESIS = new Operator(")", 30, false);
+ public static final Operator AND = new Operator("&&", 20, true);
+ public static final Operator OR = new Operator("||", 15, true);
+
+ private int priority;
+ private boolean compareable;
+
+
+ private Operator(String symbol, int priority, boolean compareable) {
+ super(symbol);
+ this.priority = priority;
+ this.compareable = compareable;
+ }
+
+ public static Operator createOperator(String operator) {
+ if (LEFTPARENTHESIS.getSymbol().equals(operator))
+ return LEFTPARENTHESIS;
+ else if (RIGHTPARENTHESIS.getSymbol().equals(operator))
+ return RIGHTPARENTHESIS;
+ else if (AND.getSymbol().equals(operator))
+ return AND;
+ else if (OR.getSymbol().equals(operator))
+ return OR;
+ else
+ throw new IllegalArgumentException("unsupport operator " + operator);
+ }
+
+ public int getPriority() {
+ return priority;
+ }
+
+ public boolean isCompareable() {
+ return compareable;
+ }
+
+
+ public int compare(Operator operator) {
+ if (this.priority > operator.priority)
+ return 1;
+ else if (this.priority == operator.priority)
+ return 0;
+ else
+ return -1;
+ }
+
+ public boolean isSpecifiedOp(String operator) {
+ return this.getSymbol().equals(operator);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/filter/impl/PolishExpr.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/filter/impl/PolishExpr.java b/common/src/main/java/org/apache/rocketmq/common/filter/impl/PolishExpr.java
new file mode 100644
index 0000000..73b51b6
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/filter/impl/PolishExpr.java
@@ -0,0 +1,189 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.filter.impl;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Stack;
+
+import static org.apache.rocketmq.common.filter.impl.Operator.LEFTPARENTHESIS;
+import static org.apache.rocketmq.common.filter.impl.Operator.RIGHTPARENTHESIS;
+import static org.apache.rocketmq.common.filter.impl.Operator.createOperator;
+
+public class PolishExpr {
+
+ public static List<Op> reversePolish(String expression) {
+ return reversePolish(participle(expression));
+ }
+
+ /**
+ * Shunting-yard algorithm <br/>
+ * http://en.wikipedia.org/wiki/Shunting_yard_algorithm
+ *
+ * @param tokens
+ * @return the compute result of Shunting-yard algorithm
+ */
+ public static List<Op> reversePolish(List<Op> tokens) {
+ List<Op> segments = new ArrayList<Op>();
+ Stack<Operator> operatorStack = new Stack<Operator>();
+
+ for (int i = 0; i < tokens.size(); i++) {
+ Op token = tokens.get(i);
+ if (isOperand(token)) {
+
+ segments.add(token);
+ } else if (isLeftParenthesis(token)) {
+
+ operatorStack.push((Operator) token);
+ } else if (isRightParenthesis(token)) {
+
+ Operator opNew = null;
+ while (!operatorStack.empty() && LEFTPARENTHESIS != (opNew = operatorStack.pop())) {
+ segments.add(opNew);
+ }
+ if (null == opNew || LEFTPARENTHESIS != opNew)
+ throw new IllegalArgumentException("mismatched parentheses");
+ } else if (isOperator(token)) {
+
+ Operator opNew = (Operator) token;
+ if (!operatorStack.empty()) {
+ Operator opOld = operatorStack.peek();
+ if (opOld.isCompareable() && opNew.compare(opOld) != 1) {
+ segments.add(operatorStack.pop());
+ }
+ }
+ operatorStack.push(opNew);
+ } else
+ throw new IllegalArgumentException("illegal token " + token);
+ }
+
+ while (!operatorStack.empty()) {
+ Operator operator = operatorStack.pop();
+ if (LEFTPARENTHESIS == operator || RIGHTPARENTHESIS == operator)
+ throw new IllegalArgumentException("mismatched parentheses " + operator);
+ segments.add(operator);
+ }
+
+ return segments;
+ }
+
+ /**
+ *
+ * @param expression
+ *
+ * @return
+ *
+ * @throws Exception
+ */
+ private static List<Op> participle(String expression) {
+ List<Op> segments = new ArrayList<Op>();
+
+ int size = expression.length();
+ int wordStartIndex = -1;
+ int wordLen = 0;
+ Type preType = Type.NULL;
+
+ for (int i = 0; i < size; i++) {
+ int chValue = (int) expression.charAt(i);
+
+ if ((97 <= chValue && chValue <= 122) || (65 <= chValue && chValue <= 90)
+ || (49 <= chValue && chValue <= 57) || 95 == chValue) {
+
+
+ if (Type.OPERATOR == preType || Type.SEPAERATOR == preType || Type.NULL == preType
+ || Type.PARENTHESIS == preType) {
+ if (Type.OPERATOR == preType) {
+ segments.add(createOperator(expression.substring(wordStartIndex, wordStartIndex
+ + wordLen)));
+ }
+ wordStartIndex = i;
+ wordLen = 0;
+ }
+ preType = Type.OPERAND;
+ wordLen++;
+ } else if (40 == chValue || 41 == chValue) {
+
+
+ if (Type.OPERATOR == preType) {
+ segments.add(createOperator(expression
+ .substring(wordStartIndex, wordStartIndex + wordLen)));
+ wordStartIndex = -1;
+ wordLen = 0;
+ } else if (Type.OPERAND == preType) {
+ segments.add(new Operand(expression.substring(wordStartIndex, wordStartIndex + wordLen)));
+ wordStartIndex = -1;
+ wordLen = 0;
+ }
+
+ preType = Type.PARENTHESIS;
+ segments.add(createOperator((char) chValue + ""));
+ } else if (38 == chValue || 124 == chValue) {
+
+ if (Type.OPERAND == preType || Type.SEPAERATOR == preType || Type.PARENTHESIS == preType) {
+ if (Type.OPERAND == preType) {
+ segments.add(new Operand(expression.substring(wordStartIndex, wordStartIndex
+ + wordLen)));
+ }
+ wordStartIndex = i;
+ wordLen = 0;
+ }
+ preType = Type.OPERATOR;
+ wordLen++;
+ } else if (32 == chValue || 9 == chValue) {
+
+
+ if (Type.OPERATOR == preType) {
+ segments.add(createOperator(expression
+ .substring(wordStartIndex, wordStartIndex + wordLen)));
+ wordStartIndex = -1;
+ wordLen = 0;
+ } else if (Type.OPERAND == preType) {
+ segments.add(new Operand(expression.substring(wordStartIndex, wordStartIndex + wordLen)));
+ wordStartIndex = -1;
+ wordLen = 0;
+ }
+ preType = Type.SEPAERATOR;
+ } else {
+
+ throw new IllegalArgumentException("illegal expression, at index " + i + " " + (char) chValue);
+ }
+
+ }
+
+ if (wordLen > 0) {
+ segments.add(new Operand(expression.substring(wordStartIndex, wordStartIndex + wordLen)));
+ }
+ return segments;
+ }
+
+ public static boolean isOperand(Op token) {
+ return token instanceof Operand;
+ }
+
+ public static boolean isLeftParenthesis(Op token) {
+ return token instanceof Operator && LEFTPARENTHESIS == (Operator) token;
+ }
+
+ public static boolean isRightParenthesis(Op token) {
+ return token instanceof Operator && RIGHTPARENTHESIS == (Operator) token;
+ }
+
+ public static boolean isOperator(Op token) {
+ return token instanceof Operator;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/filter/impl/Type.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/filter/impl/Type.java b/common/src/main/java/org/apache/rocketmq/common/filter/impl/Type.java
new file mode 100644
index 0000000..834bde8
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/filter/impl/Type.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.filter.impl;
+
+public enum Type {
+ NULL,
+ OPERAND,
+ OPERATOR,
+ PARENTHESIS,
+ SEPAERATOR;
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/help/FAQUrl.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/help/FAQUrl.java b/common/src/main/java/org/apache/rocketmq/common/help/FAQUrl.java
new file mode 100644
index 0000000..85bef76
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/help/FAQUrl.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.help;
+
+/**
+ * @author shijia.wxr
+ */
+public class FAQUrl {
+
+ public static final String APPLY_TOPIC_URL = //
+ "http://docs.aliyun.com/cn#/pub/ons/faq/exceptions&topic_not_exist";
+
+
+ public static final String NAME_SERVER_ADDR_NOT_EXIST_URL = //
+ "http://docs.aliyun.com/cn#/pub/ons/faq/exceptions&namesrv_not_exist";
+
+
+ public static final String GROUP_NAME_DUPLICATE_URL = //
+ "http://docs.aliyun.com/cn#/pub/ons/faq/exceptions&group_duplicate";
+
+
+ public static final String CLIENT_PARAMETER_CHECK_URL = //
+ "http://docs.aliyun.com/cn#/pub/ons/faq/exceptions¶meter_check_failed";
+
+
+ public static final String SUBSCRIPTION_GROUP_NOT_EXIST = //
+ "http://docs.aliyun.com/cn#/pub/ons/faq/exceptions&subGroup_not_exist";
+
+
+ public static final String CLIENT_SERVICE_NOT_OK = //
+ "http://docs.aliyun.com/cn#/pub/ons/faq/exceptions&service_not_ok";
+
+ // FAQ: No route info of this topic, TopicABC
+ public static final String NO_TOPIC_ROUTE_INFO = //
+ "http://docs.aliyun.com/cn#/pub/ons/faq/exceptions&topic_not_exist";
+
+
+ public static final String LOAD_JSON_EXCEPTION = //
+ "http://docs.aliyun.com/cn#/pub/ons/faq/exceptions&load_json_exception";
+
+
+ public static final String SAME_GROUP_DIFFERENT_TOPIC = //
+ "http://docs.aliyun.com/cn#/pub/ons/faq/exceptions&subscription_exception";
+
+
+ public static final String MQLIST_NOT_EXIST = //
+ "http://docs.aliyun.com/cn#/pub/ons/faq/exceptions&queue_not_exist";
+
+ public static final String UNEXPECTED_EXCEPTION_URL = //
+ "http://docs.aliyun.com/cn#/pub/ons/faq/exceptions&unexpected_exception";
+
+
+ public static final String SEND_MSG_FAILED = //
+ "http://docs.aliyun.com/cn#/pub/ons/faq/exceptions&send_msg_failed";
+
+
+ public static final String UNKNOWN_HOST_EXCEPTION = //
+ "http://docs.aliyun.com/cn#/pub/ons/faq/exceptions&unknown_host";
+
+ private static final String TIP_STRING_BEGIN = "\nSee ";
+ private static final String TIP_STRING_END = " for further details.";
+
+
+ public static String suggestTodo(final String url) {
+ StringBuilder sb = new StringBuilder();
+ sb.append(TIP_STRING_BEGIN);
+ sb.append(url);
+ sb.append(TIP_STRING_END);
+ return sb.toString();
+ }
+
+ public static String attachDefaultURL(final String errorMessage) {
+ if (errorMessage != null) {
+ int index = errorMessage.indexOf(TIP_STRING_BEGIN);
+ if (-1 == index) {
+ StringBuilder sb = new StringBuilder();
+ sb.append(errorMessage);
+ sb.append("\n");
+ sb.append("For more information, please visit the url, ");
+ sb.append(UNEXPECTED_EXCEPTION_URL);
+ return sb.toString();
+ }
+ }
+
+ return errorMessage;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/hook/FilterCheckHook.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/hook/FilterCheckHook.java b/common/src/main/java/org/apache/rocketmq/common/hook/FilterCheckHook.java
new file mode 100644
index 0000000..a99df6e
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/hook/FilterCheckHook.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.hook;
+
+import java.nio.ByteBuffer;
+
+
+/**
+ *
+ * @author manhong.yqd
+ *
+ */
+public interface FilterCheckHook {
+ public String hookName();
+
+
+ public boolean isFilterMatched(final boolean isUnitMode, final ByteBuffer byteBuffer);
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/message/Message.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/message/Message.java b/common/src/main/java/org/apache/rocketmq/common/message/Message.java
new file mode 100644
index 0000000..c2d2d85
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/message/Message.java
@@ -0,0 +1,212 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.message;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+
+/**
+ *
+ * @author shijia.wxr
+ *
+ */
+public class Message implements Serializable {
+ private static final long serialVersionUID = 8445773977080406428L;
+
+ private String topic;
+ private int flag;
+ private Map<String, String> properties;
+ private byte[] body;
+
+
+ public Message() {
+ }
+
+
+ public Message(String topic, byte[] body) {
+ this(topic, "", "", 0, body, true);
+ }
+
+
+ public Message(String topic, String tags, String keys, int flag, byte[] body, boolean waitStoreMsgOK) {
+ this.topic = topic;
+ this.flag = flag;
+ this.body = body;
+
+ if (tags != null && tags.length() > 0)
+ this.setTags(tags);
+
+ if (keys != null && keys.length() > 0)
+ this.setKeys(keys);
+
+ this.setWaitStoreMsgOK(waitStoreMsgOK);
+ }
+
+ public void setKeys(String keys) {
+ this.putProperty(MessageConst.PROPERTY_KEYS, keys);
+ }
+
+ void putProperty(final String name, final String value) {
+ if (null == this.properties) {
+ this.properties = new HashMap<String, String>();
+ }
+
+ this.properties.put(name, value);
+ }
+
+
+ public Message(String topic, String tags, byte[] body) {
+ this(topic, tags, "", 0, body, true);
+ }
+
+
+ public Message(String topic, String tags, String keys, byte[] body) {
+ this(topic, tags, keys, 0, body, true);
+ }
+
+ void clearProperty(final String name) {
+ if (null != this.properties) {
+ this.properties.remove(name);
+ }
+ }
+
+ public void putUserProperty(final String name, final String value) {
+ if (MessageConst.STRING_HASH_SET.contains(name)) {
+ throw new RuntimeException(String.format(
+ "The Property<%s> is used by system, input another please", name));
+ }
+ this.putProperty(name, value);
+ }
+
+ public String getUserProperty(final String name) {
+ return this.getProperty(name);
+ }
+
+ public String getProperty(final String name) {
+ if (null == this.properties) {
+ this.properties = new HashMap<String, String>();
+ }
+
+ return this.properties.get(name);
+ }
+
+ public String getTopic() {
+ return topic;
+ }
+
+ public void setTopic(String topic) {
+ this.topic = topic;
+ }
+
+ public String getTags() {
+ return this.getProperty(MessageConst.PROPERTY_TAGS);
+ }
+
+ public void setTags(String tags) {
+ this.putProperty(MessageConst.PROPERTY_TAGS, tags);
+ }
+
+ public String getKeys() {
+ return this.getProperty(MessageConst.PROPERTY_KEYS);
+ }
+
+ public void setKeys(Collection<String> keys) {
+ StringBuffer sb = new StringBuffer();
+ for (String k : keys) {
+ sb.append(k);
+ sb.append(MessageConst.KEY_SEPARATOR);
+ }
+
+ this.setKeys(sb.toString().trim());
+ }
+
+
+ public int getDelayTimeLevel() {
+ String t = this.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL);
+ if (t != null) {
+ return Integer.parseInt(t);
+ }
+
+ return 0;
+ }
+
+
+ public void setDelayTimeLevel(int level) {
+ this.putProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL, String.valueOf(level));
+ }
+
+
+ public boolean isWaitStoreMsgOK() {
+ String result = this.getProperty(MessageConst.PROPERTY_WAIT_STORE_MSG_OK);
+ if (null == result)
+ return true;
+
+ return Boolean.parseBoolean(result);
+ }
+
+
+ public void setWaitStoreMsgOK(boolean waitStoreMsgOK) {
+ this.putProperty(MessageConst.PROPERTY_WAIT_STORE_MSG_OK, Boolean.toString(waitStoreMsgOK));
+ }
+
+
+ public int getFlag() {
+ return flag;
+ }
+
+
+ public void setFlag(int flag) {
+ this.flag = flag;
+ }
+
+
+ public byte[] getBody() {
+ return body;
+ }
+
+
+ public void setBody(byte[] body) {
+ this.body = body;
+ }
+
+
+ public Map<String, String> getProperties() {
+ return properties;
+ }
+
+
+ void setProperties(Map<String, String> properties) {
+ this.properties = properties;
+ }
+
+ public String getBuyerId() {
+ return getProperty(MessageConst.PROPERTY_BUYER_ID);
+ }
+
+ public void setBuyerId(String buyerId) {
+ putProperty(MessageConst.PROPERTY_BUYER_ID, buyerId);
+ }
+
+ @Override
+ public String toString() {
+ return "Message [topic=" + topic + ", flag=" + flag + ", properties=" + properties + ", body="
+ + (body != null ? body.length : 0) + "]";
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/message/MessageAccessor.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageAccessor.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageAccessor.java
new file mode 100644
index 0000000..5cd0ba8
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageAccessor.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.message;
+
+import java.util.Map;
+
+
+public class MessageAccessor {
+
+ public static void clearProperty(final Message msg, final String name) {
+ msg.clearProperty(name);
+ }
+
+ public static void setProperties(final Message msg, Map<String, String> properties) {
+ msg.setProperties(properties);
+ }
+
+ public static void setTransferFlag(final Message msg, String unit) {
+ putProperty(msg, MessageConst.PROPERTY_TRANSFER_FLAG, unit);
+ }
+
+ public static void putProperty(final Message msg, final String name, final String value) {
+ msg.putProperty(name, value);
+ }
+
+ public static String getTransferFlag(final Message msg) {
+ return msg.getProperty(MessageConst.PROPERTY_TRANSFER_FLAG);
+ }
+
+
+ public static void setCorrectionFlag(final Message msg, String unit) {
+ putProperty(msg, MessageConst.PROPERTY_CORRECTION_FLAG, unit);
+ }
+
+
+ public static String getCorrectionFlag(final Message msg) {
+ return msg.getProperty(MessageConst.PROPERTY_CORRECTION_FLAG);
+ }
+
+
+ public static void setOriginMessageId(final Message msg, String originMessageId) {
+ putProperty(msg, MessageConst.PROPERTY_ORIGIN_MESSAGE_ID, originMessageId);
+ }
+
+
+ public static String getOriginMessageId(final Message msg) {
+ return msg.getProperty(MessageConst.PROPERTY_ORIGIN_MESSAGE_ID);
+ }
+
+
+ public static void setMQ2Flag(final Message msg, String flag) {
+ putProperty(msg, MessageConst.PROPERTY_MQ2_FLAG, flag);
+ }
+
+
+ public static String getMQ2Flag(final Message msg) {
+ return msg.getProperty(MessageConst.PROPERTY_MQ2_FLAG);
+ }
+
+
+ public static void setReconsumeTime(final Message msg, String reconsumeTimes) {
+ putProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME, reconsumeTimes);
+ }
+
+
+ public static String getReconsumeTime(final Message msg) {
+ return msg.getProperty(MessageConst.PROPERTY_RECONSUME_TIME);
+ }
+
+
+ public static void setMaxReconsumeTimes(final Message msg, String maxReconsumeTimes) {
+ putProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES, maxReconsumeTimes);
+ }
+
+
+ public static String getMaxReconsumeTimes(final Message msg) {
+ return msg.getProperty(MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
+ }
+
+ public static void setConsumeStartTimeStamp(final Message msg, String propertyConsumeStartTimeStamp) {
+ putProperty(msg, MessageConst.PROPERTY_CONSUME_START_TIMESTAMP, propertyConsumeStartTimeStamp);
+ }
+
+
+ public static String getConsumeStartTimeStamp(final Message msg) {
+ return msg.getProperty(MessageConst.PROPERTY_CONSUME_START_TIMESTAMP);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/message/MessageClientExt.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageClientExt.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageClientExt.java
new file mode 100644
index 0000000..90703ca
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageClientExt.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.message;
+
+public class MessageClientExt extends MessageExt {
+
+ public void setOffsetMsgId(String offsetMsgId) {
+ super.setMsgId(offsetMsgId);
+ }
+
+
+ public String getOffsetMsgId() {
+ return super.getMsgId();
+ }
+
+ public void setMsgId(String msgId) {
+ //DO NOTHING
+ //MessageClientIDSetter.setUniqID(this);
+ }
+
+ @Override
+ public String getMsgId() {
+ String uniqID = MessageClientIDSetter.getUniqID(this);
+ if (uniqID == null) {
+ return this.getOffsetMsgId();
+ }
+ else {
+ return uniqID;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/message/MessageClientIDSetter.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageClientIDSetter.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageClientIDSetter.java
new file mode 100644
index 0000000..1c3a1b7
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageClientIDSetter.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.message;
+
+import org.apache.rocketmq.common.UtilAll;
+
+import java.nio.ByteBuffer;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class MessageClientIDSetter {
+ private static final String TOPIC_KEY_SPLITTER = "#";
+ private static final int LEN;
+ private static final String FIX_STRING;
+ private static final AtomicInteger COUNTER;
+ private static long startTime;
+ private static long nextStartTime;
+
+ static {
+ LEN = 4 + 2 + 4 + 4 + 2;
+ ByteBuffer tempBuffer = ByteBuffer.allocate(10);
+ tempBuffer.position(2);
+ tempBuffer.putInt(UtilAll.getPid());
+ tempBuffer.position(0);
+ try {
+ tempBuffer.put(UtilAll.getIP());
+ } catch (Exception e) {
+ tempBuffer.put(createFakeIP());
+ }
+ tempBuffer.position(6);
+ tempBuffer.putInt(MessageClientIDSetter.class.getClassLoader().hashCode()); //4
+ FIX_STRING = UtilAll.bytes2string(tempBuffer.array());
+ setStartTime(System.currentTimeMillis());
+ COUNTER = new AtomicInteger(0);
+ }
+
+ private synchronized static void setStartTime(long millis) {
+ Calendar cal = Calendar.getInstance();
+ cal.setTimeInMillis(millis);
+ cal.set(Calendar.DAY_OF_MONTH, 1);
+ cal.set(Calendar.HOUR, 0);
+ cal.set(Calendar.MINUTE, 0);
+ cal.set(Calendar.SECOND, 0);
+ cal.set(Calendar.MILLISECOND, 0);
+ startTime = cal.getTimeInMillis();
+ cal.add(Calendar.MONTH, 1);
+ nextStartTime = cal.getTimeInMillis();
+ }
+
+ public static Date getNearlyTimeFromID(String msgID) {
+ ByteBuffer buf = ByteBuffer.allocate(8);
+ byte[] bytes = UtilAll.string2bytes(msgID);
+ buf.put((byte) 0);
+ buf.put((byte) 0);
+ buf.put((byte) 0);
+ buf.put((byte) 0);
+ buf.put(bytes, 10, 4);
+ buf.position(0);
+ long spanMS = buf.getLong();
+ Calendar cal = Calendar.getInstance();
+ long now = cal.getTimeInMillis();
+ cal.set(Calendar.DAY_OF_MONTH, 1);
+ cal.set(Calendar.HOUR, 0);
+ cal.set(Calendar.MINUTE, 0);
+ cal.set(Calendar.SECOND, 0);
+ cal.set(Calendar.MILLISECOND, 0);
+ long monStartTime = cal.getTimeInMillis();
+ if (monStartTime + spanMS >= now) {
+ cal.add(Calendar.MONTH, -1);
+ monStartTime = cal.getTimeInMillis();
+ }
+ cal.setTimeInMillis(monStartTime + spanMS);
+ return cal.getTime();
+ }
+
+ public static String getIPStrFromID(String msgID) {
+ byte[] ipBytes = getIPFromID(msgID);
+ return UtilAll.ipToIPv4Str(ipBytes);
+ }
+
+ public static byte[] getIPFromID(String msgID) {
+ byte[] result = new byte[4];
+ byte[] bytes = UtilAll.string2bytes(msgID);
+ System.arraycopy(bytes, 0, result, 0, 4);
+ return result;
+ }
+
+ public static String createUniqID() {
+ StringBuilder sb = new StringBuilder(LEN * 2);
+ sb.append(FIX_STRING);
+ sb.append(UtilAll.bytes2string(createUniqIDBuffer()));
+ return sb.toString();
+ }
+
+
+ private static byte[] createUniqIDBuffer() {
+ ByteBuffer buffer = ByteBuffer.allocate(4 + 2);
+ long current = System.currentTimeMillis();
+ if (current >= nextStartTime) {
+ setStartTime(current);
+ }
+ buffer.position(0);
+ buffer.putInt((int) (System.currentTimeMillis() - startTime));
+ buffer.putShort((short) COUNTER.getAndIncrement());
+ return buffer.array();
+ }
+
+ public static void setUniqID(final Message msg) {
+ if (msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX) == null) {
+ msg.putProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, createUniqID());
+ }
+ }
+
+ public static String getUniqID(final Message msg) {
+ return msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
+ }
+
+ public static byte[] createFakeIP() {
+ ByteBuffer bb = ByteBuffer.allocate(8);
+ bb.putLong(System.currentTimeMillis());
+ bb.position(4);
+ byte[] fakeIP = new byte[4];
+ bb.get(fakeIP);
+ return fakeIP;
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java
new file mode 100644
index 0000000..d65160b
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.message;
+
+import java.util.HashSet;
+
+
+public class MessageConst {
+ public static final String PROPERTY_KEYS = "KEYS";
+ public static final String PROPERTY_TAGS = "TAGS";
+ public static final String PROPERTY_WAIT_STORE_MSG_OK = "WAIT";
+ public static final String PROPERTY_DELAY_TIME_LEVEL = "DELAY";
+ public static final String PROPERTY_RETRY_TOPIC = "RETRY_TOPIC";
+ public static final String PROPERTY_REAL_TOPIC = "REAL_TOPIC";
+ public static final String PROPERTY_REAL_QUEUE_ID = "REAL_QID";
+ public static final String PROPERTY_TRANSACTION_PREPARED = "TRAN_MSG";
+ public static final String PROPERTY_PRODUCER_GROUP = "PGROUP";
+ public static final String PROPERTY_MIN_OFFSET = "MIN_OFFSET";
+ public static final String PROPERTY_MAX_OFFSET = "MAX_OFFSET";
+ public static final String PROPERTY_BUYER_ID = "BUYER_ID";
+ public static final String PROPERTY_ORIGIN_MESSAGE_ID = "ORIGIN_MESSAGE_ID";
+ public static final String PROPERTY_TRANSFER_FLAG = "TRANSFER_FLAG";
+ public static final String PROPERTY_CORRECTION_FLAG = "CORRECTION_FLAG";
+ public static final String PROPERTY_MQ2_FLAG = "MQ2_FLAG";
+ public static final String PROPERTY_RECONSUME_TIME = "RECONSUME_TIME";
+ public static final String PROPERTY_MSG_REGION = "MSG_REGION";
+ public static final String PROPERTY_TRACE_SWITCH = "TRACE_ON";
+ public static final String PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX = "UNIQ_KEY";
+ public static final String PROPERTY_MAX_RECONSUME_TIMES = "MAX_RECONSUME_TIMES";
+ public static final String PROPERTY_CONSUME_START_TIMESTAMP = "CONSUME_START_TIME";
+
+ public static final String KEY_SEPARATOR = " ";
+
+ public static final HashSet<String> STRING_HASH_SET = new HashSet<String>();
+
+
+ static {
+ STRING_HASH_SET.add(PROPERTY_TRACE_SWITCH);
+ STRING_HASH_SET.add(PROPERTY_MSG_REGION);
+ STRING_HASH_SET.add(PROPERTY_KEYS);
+ STRING_HASH_SET.add(PROPERTY_TAGS);
+ STRING_HASH_SET.add(PROPERTY_WAIT_STORE_MSG_OK);
+ STRING_HASH_SET.add(PROPERTY_DELAY_TIME_LEVEL);
+ STRING_HASH_SET.add(PROPERTY_RETRY_TOPIC);
+ STRING_HASH_SET.add(PROPERTY_REAL_TOPIC);
+ STRING_HASH_SET.add(PROPERTY_REAL_QUEUE_ID);
+ STRING_HASH_SET.add(PROPERTY_TRANSACTION_PREPARED);
+ STRING_HASH_SET.add(PROPERTY_PRODUCER_GROUP);
+ STRING_HASH_SET.add(PROPERTY_MIN_OFFSET);
+ STRING_HASH_SET.add(PROPERTY_MAX_OFFSET);
+ STRING_HASH_SET.add(PROPERTY_BUYER_ID);
+ STRING_HASH_SET.add(PROPERTY_ORIGIN_MESSAGE_ID);
+ STRING_HASH_SET.add(PROPERTY_TRANSFER_FLAG);
+ STRING_HASH_SET.add(PROPERTY_CORRECTION_FLAG);
+ STRING_HASH_SET.add(PROPERTY_MQ2_FLAG);
+ STRING_HASH_SET.add(PROPERTY_RECONSUME_TIME);
+ STRING_HASH_SET.add(PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
+ STRING_HASH_SET.add(PROPERTY_MAX_RECONSUME_TIMES);
+ STRING_HASH_SET.add(PROPERTY_CONSUME_START_TIMESTAMP);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java
new file mode 100644
index 0000000..4410171
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java
@@ -0,0 +1,395 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.message;
+
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.sysflag.MessageSysFlag;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.net.UnknownHostException;
+import java.nio.BufferUnderflowException;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class MessageDecoder {
+ public final static int MSG_ID_LENGTH = 8 + 8;
+
+ public final static Charset CHARSET_UTF8 = Charset.forName("UTF-8");
+ public final static int MESSAGE_MAGIC_CODE_POSTION = 4;
+ public final static int MESSAGE_FLAG_POSTION = 16;
+ public final static int MESSAGE_PHYSIC_OFFSET_POSTION = 28;
+ public final static int MESSAGE_STORE_TIMESTAMP_POSTION = 56;
+ public final static int MESSAGE_MAGIC_CODE = 0xAABBCCDD ^ 1880681586 + 8;
+
+
+ public static String createMessageId(final ByteBuffer input, final ByteBuffer addr, final long offset) {
+ input.flip();
+ input.limit(MessageDecoder.MSG_ID_LENGTH);
+
+ input.put(addr);
+ input.putLong(offset);
+
+ return UtilAll.bytes2string(input.array());
+ }
+
+
+ public static String createMessageId(SocketAddress socketAddress, long transactionIdhashCode) {
+ ByteBuffer byteBuffer = ByteBuffer.allocate(MessageDecoder.MSG_ID_LENGTH);
+ InetSocketAddress inetSocketAddress = (InetSocketAddress) socketAddress;
+ byteBuffer.put(inetSocketAddress.getAddress().getAddress());
+ byteBuffer.putInt(inetSocketAddress.getPort());
+ byteBuffer.putLong(transactionIdhashCode);
+ byteBuffer.flip();
+ return UtilAll.bytes2string(byteBuffer.array());
+ }
+
+
+ public static MessageId decodeMessageId(final String msgId) throws UnknownHostException {
+ SocketAddress address;
+ long offset;
+
+
+ byte[] ip = UtilAll.string2bytes(msgId.substring(0, 8));
+ byte[] port = UtilAll.string2bytes(msgId.substring(8, 16));
+ ByteBuffer bb = ByteBuffer.wrap(port);
+ int portInt = bb.getInt(0);
+ address = new InetSocketAddress(InetAddress.getByAddress(ip), portInt);
+
+ // offset
+ byte[] data = UtilAll.string2bytes(msgId.substring(16, 32));
+ bb = ByteBuffer.wrap(data);
+ offset = bb.getLong(0);
+
+ return new MessageId(address, offset);
+ }
+
+
+ public static MessageExt decode(java.nio.ByteBuffer byteBuffer) {
+ return decode(byteBuffer, true, true, false);
+ }
+
+ public static MessageExt clientDecode(java.nio.ByteBuffer byteBuffer, final boolean readBody) {
+ return decode(byteBuffer, readBody, true, true);
+ }
+
+ public static MessageExt decode(java.nio.ByteBuffer byteBuffer, final boolean readBody) {
+ return decode(byteBuffer, readBody, true, false);
+ }
+
+
+ public static byte[] encode(MessageExt messageExt, boolean needCompress) throws Exception {
+ byte[] body = messageExt.getBody();
+ byte[] topics = messageExt.getTopic().getBytes(CHARSET_UTF8);
+ byte topicLen = (byte) topics.length;
+ String properties = messageProperties2String(messageExt.getProperties());
+ byte[] propertiesBytes = properties.getBytes(CHARSET_UTF8);
+ short propertiesLength = (short) propertiesBytes.length;
+ int sysFlag = messageExt.getSysFlag();
+ byte[] newBody = messageExt.getBody();
+ if (needCompress && (sysFlag & MessageSysFlag.COMPRESSED_FLAG) == MessageSysFlag.COMPRESSED_FLAG) {
+ newBody = UtilAll.compress(body, 5);
+ }
+ int bodyLength = newBody.length;
+ int storeSize = messageExt.getStoreSize();
+ ByteBuffer byteBuffer;
+ if (storeSize > 0) {
+ byteBuffer = ByteBuffer.allocate(storeSize);
+ } else {
+ storeSize = 4 // 1 TOTALSIZE
+ + 4 // 2 MAGICCODE
+ + 4 // 3 BODYCRC
+ + 4 // 4 QUEUEID
+ + 4 // 5 FLAG
+ + 8 // 6 QUEUEOFFSET
+ + 8 // 7 PHYSICALOFFSET
+ + 4 // 8 SYSFLAG
+ + 8 // 9 BORNTIMESTAMP
+ + 8 // 10 BORNHOST
+ + 8 // 11 STORETIMESTAMP
+ + 8 // 12 STOREHOSTADDRESS
+ + 4 // 13 RECONSUMETIMES
+ + 8 // 14 Prepared Transaction Offset
+ + 4 + bodyLength // 14 BODY
+ + 1 + topicLen // 15 TOPIC
+ + 2 + propertiesLength // 16 propertiesLength
+ + 0;
+ byteBuffer = ByteBuffer.allocate(storeSize);
+ }
+ // 1 TOTALSIZE
+ byteBuffer.putInt(storeSize);
+
+ // 2 MAGICCODE
+ byteBuffer.putInt(MESSAGE_MAGIC_CODE);
+
+ // 3 BODYCRC
+ int bodyCRC = messageExt.getBodyCRC();
+ byteBuffer.putInt(bodyCRC);
+
+ // 4 QUEUEID
+ int queueId = messageExt.getQueueId();
+ byteBuffer.putInt(queueId);
+
+ // 5 FLAG
+ int flag = messageExt.getFlag();
+ byteBuffer.putInt(flag);
+
+ // 6 QUEUEOFFSET
+ long queueOffset = messageExt.getQueueOffset();
+ byteBuffer.putLong(queueOffset);
+
+ // 7 PHYSICALOFFSET
+ long physicOffset = messageExt.getCommitLogOffset();
+ byteBuffer.putLong(physicOffset);
+
+ // 8 SYSFLAG
+ byteBuffer.putInt(sysFlag);
+
+ // 9 BORNTIMESTAMP
+ long bornTimeStamp = messageExt.getBornTimestamp();
+ byteBuffer.putLong(bornTimeStamp);
+
+ // 10 BORNHOST
+ InetSocketAddress bornHost = (InetSocketAddress) messageExt.getBornHost();
+ byteBuffer.put(bornHost.getAddress().getAddress());
+ byteBuffer.putInt(bornHost.getPort());
+
+ // 11 STORETIMESTAMP
+ long storeTimestamp = messageExt.getStoreTimestamp();
+ byteBuffer.putLong(storeTimestamp);
+
+ // 12 STOREHOST
+ InetSocketAddress serverHost = (InetSocketAddress) messageExt.getStoreHost();
+ byteBuffer.put(serverHost.getAddress().getAddress());
+ byteBuffer.putInt(serverHost.getPort());
+
+ // 13 RECONSUMETIMES
+ int reconsumeTimes = messageExt.getReconsumeTimes();
+ byteBuffer.putInt(reconsumeTimes);
+
+ // 14 Prepared Transaction Offset
+ long preparedTransactionOffset = messageExt.getPreparedTransactionOffset();
+ byteBuffer.putLong(preparedTransactionOffset);
+
+ // 15 BODY
+ byteBuffer.putInt(bodyLength);
+ byteBuffer.put(newBody);
+
+ // 16 TOPIC
+ byteBuffer.put(topicLen);
+ byteBuffer.put(topics);
+
+ // 17 properties
+ byteBuffer.putShort(propertiesLength);
+ byteBuffer.put(propertiesBytes);
+
+ return byteBuffer.array();
+ }
+
+ public static MessageExt decode(
+ java.nio.ByteBuffer byteBuffer, final boolean readBody, final boolean deCompressBody) {
+ return decode(byteBuffer, readBody, deCompressBody, false);
+ }
+
+ public static MessageExt decode(
+ java.nio.ByteBuffer byteBuffer, final boolean readBody, final boolean deCompressBody, final boolean isClient) {
+ try {
+
+ MessageExt msgExt;
+ if (isClient) {
+ msgExt = new MessageClientExt();
+ } else {
+ msgExt = new MessageExt();
+ }
+
+ // 1 TOTALSIZE
+ int storeSize = byteBuffer.getInt();
+ msgExt.setStoreSize(storeSize);
+
+ // 2 MAGICCODE
+ byteBuffer.getInt();
+
+ // 3 BODYCRC
+ int bodyCRC = byteBuffer.getInt();
+ msgExt.setBodyCRC(bodyCRC);
+
+ // 4 QUEUEID
+ int queueId = byteBuffer.getInt();
+ msgExt.setQueueId(queueId);
+
+ // 5 FLAG
+ int flag = byteBuffer.getInt();
+ msgExt.setFlag(flag);
+
+ // 6 QUEUEOFFSET
+ long queueOffset = byteBuffer.getLong();
+ msgExt.setQueueOffset(queueOffset);
+
+ // 7 PHYSICALOFFSET
+ long physicOffset = byteBuffer.getLong();
+ msgExt.setCommitLogOffset(physicOffset);
+
+ // 8 SYSFLAG
+ int sysFlag = byteBuffer.getInt();
+ msgExt.setSysFlag(sysFlag);
+
+ // 9 BORNTIMESTAMP
+ long bornTimeStamp = byteBuffer.getLong();
+ msgExt.setBornTimestamp(bornTimeStamp);
+
+ // 10 BORNHOST
+ byte[] bornHost = new byte[4];
+ byteBuffer.get(bornHost, 0, 4);
+ int port = byteBuffer.getInt();
+ msgExt.setBornHost(new InetSocketAddress(InetAddress.getByAddress(bornHost), port));
+
+ // 11 STORETIMESTAMP
+ long storeTimestamp = byteBuffer.getLong();
+ msgExt.setStoreTimestamp(storeTimestamp);
+
+ // 12 STOREHOST
+ byte[] storeHost = new byte[4];
+ byteBuffer.get(storeHost, 0, 4);
+ port = byteBuffer.getInt();
+ msgExt.setStoreHost(new InetSocketAddress(InetAddress.getByAddress(storeHost), port));
+
+ // 13 RECONSUMETIMES
+ int reconsumeTimes = byteBuffer.getInt();
+ msgExt.setReconsumeTimes(reconsumeTimes);
+
+ // 14 Prepared Transaction Offset
+ long preparedTransactionOffset = byteBuffer.getLong();
+ msgExt.setPreparedTransactionOffset(preparedTransactionOffset);
+
+ // 15 BODY
+ int bodyLen = byteBuffer.getInt();
+ if (bodyLen > 0) {
+ if (readBody) {
+ byte[] body = new byte[bodyLen];
+ byteBuffer.get(body);
+
+ // uncompress body
+ if (deCompressBody && (sysFlag & MessageSysFlag.COMPRESSED_FLAG) == MessageSysFlag.COMPRESSED_FLAG) {
+ body = UtilAll.uncompress(body);
+ }
+
+ msgExt.setBody(body);
+ } else {
+ byteBuffer.position(byteBuffer.position() + bodyLen);
+ }
+ }
+
+ // 16 TOPIC
+ byte topicLen = byteBuffer.get();
+ byte[] topic = new byte[(int) topicLen];
+ byteBuffer.get(topic);
+ msgExt.setTopic(new String(topic, CHARSET_UTF8));
+
+ // 17 properties
+ short propertiesLength = byteBuffer.getShort();
+ if (propertiesLength > 0) {
+ byte[] properties = new byte[propertiesLength];
+ byteBuffer.get(properties);
+ String propertiesString = new String(properties, CHARSET_UTF8);
+ Map<String, String> map = string2messageProperties(propertiesString);
+ msgExt.setProperties(map);
+ }
+
+ ByteBuffer byteBufferMsgId = ByteBuffer.allocate(MSG_ID_LENGTH);
+ String msgId = createMessageId(byteBufferMsgId, msgExt.getStoreHostBytes(), msgExt.getCommitLogOffset());
+ msgExt.setMsgId(msgId);
+
+ if (isClient) {
+ ((MessageClientExt) msgExt).setOffsetMsgId(msgId);
+ }
+
+ return msgExt;
+ } catch (UnknownHostException e) {
+ byteBuffer.position(byteBuffer.limit());
+ } catch (BufferUnderflowException e) {
+ byteBuffer.position(byteBuffer.limit());
+ } catch (Exception e) {
+ byteBuffer.position(byteBuffer.limit());
+ }
+
+ return null;
+ }
+
+
+ public static List<MessageExt> decodes(java.nio.ByteBuffer byteBuffer) {
+ return decodes(byteBuffer, true);
+ }
+
+ public static List<MessageExt> decodes(java.nio.ByteBuffer byteBuffer, final boolean readBody) {
+ List<MessageExt> msgExts = new ArrayList<MessageExt>();
+ while (byteBuffer.hasRemaining()) {
+ MessageExt msgExt = clientDecode(byteBuffer, readBody);
+ if (null != msgExt) {
+ msgExts.add(msgExt);
+ } else {
+ break;
+ }
+ }
+ return msgExts;
+ }
+
+ public static final char NAME_VALUE_SEPARATOR = 1;
+ public static final char PROPERTY_SEPARATOR = 2;
+
+
+ public static String messageProperties2String(Map<String, String> properties) {
+ StringBuilder sb = new StringBuilder();
+ if (properties != null) {
+ for (final Map.Entry<String, String> entry : properties.entrySet()) {
+ final String name = entry.getKey();
+ final String value = entry.getValue();
+
+ sb.append(name);
+ sb.append(NAME_VALUE_SEPARATOR);
+ sb.append(value);
+ sb.append(PROPERTY_SEPARATOR);
+ }
+ }
+ return sb.toString();
+ }
+
+ public static Map<String, String> string2messageProperties(final String properties) {
+ Map<String, String> map = new HashMap<String, String>();
+ if (properties != null) {
+ String[] items = properties.split(String.valueOf(PROPERTY_SEPARATOR));
+ if (items != null) {
+ for (String i : items) {
+ String[] nv = i.split(String.valueOf(NAME_VALUE_SEPARATOR));
+ if (nv != null && 2 == nv.length) {
+ map.put(nv[0], nv[1]);
+ }
+ }
+ }
+ }
+
+ return map;
+ }
+}