You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2017/08/01 07:23:24 UTC
[26/50] [abbrv] incubator-rocketmq git commit: [ROCKETMQ-121]Support
message filtering based on SQL92 closes apache/incubator-rocketmq#82
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/common/src/main/java/org/apache/rocketmq/common/message/Message.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/message/Message.java b/common/src/main/java/org/apache/rocketmq/common/message/Message.java
index f3bff83..2c81f5c 100644
--- a/common/src/main/java/org/apache/rocketmq/common/message/Message.java
+++ b/common/src/main/java/org/apache/rocketmq/common/message/Message.java
@@ -81,6 +81,12 @@ public class Message implements Serializable {
throw new RuntimeException(String.format(
"The Property<%s> is used by system, input another please", name));
}
+ if (value == null || value == "" || value.trim() == ""
+ || name == null || name == "" || name.trim() == "") {
+ throw new IllegalArgumentException(
+ "The name or value of property can not be null or blank string!"
+ );
+ }
this.putProperty(name, value);
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java
index 90b837a..e41ec9d 100644
--- a/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java
+++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java
@@ -41,6 +41,20 @@ public class MessageDecoder {
public final static int MESSAGE_MAGIC_CODE = 0xAABBCCDD ^ 1880681586 + 8;
public static final char NAME_VALUE_SEPARATOR = 1;
public static final char PROPERTY_SEPARATOR = 2;
+ public static final int BODY_SIZE_POSITION = 4 // 1 TOTALSIZE
+ + 4 // 2 MAGICCODE
+ + 4 // 3 BODYCRC
+ + 4 // 4 QUEUEID
+ + 4 // 5 FLAG
+ + 8 // 6 QUEUEOFFSET
+ + 8 // 7 PHYSICALOFFSET
+ + 4 // 8 SYSFLAG
+ + 8 // 9 BORNTIMESTAMP
+ + 8 // 10 BORNHOST
+ + 8 // 11 STORETIMESTAMP
+ + 8 // 12 STOREHOSTADDRESS
+ + 4 // 13 RECONSUMETIMES
+ + 8; // 14 Prepared Transaction Offset
public static String createMessageId(final ByteBuffer input, final ByteBuffer addr, final long offset) {
input.flip();
@@ -80,6 +94,31 @@ public class MessageDecoder {
return new MessageId(address, offset);
}
+ /**
+ * Just decode properties from msg buffer.
+ *
+ * @param byteBuffer msg commit log buffer.
+ * @return
+ */
+ public static Map<String, String> decodeProperties(java.nio.ByteBuffer byteBuffer) {
+ int topicLengthPosition = BODY_SIZE_POSITION + 4 + byteBuffer.getInt(BODY_SIZE_POSITION);
+
+ byte topicLength = byteBuffer.get(topicLengthPosition);
+
+ short propertiesLength = byteBuffer.getShort(topicLengthPosition + 1 + topicLength);
+
+ byteBuffer.position(topicLengthPosition + 1 + topicLength + 2);
+
+ if (propertiesLength > 0) {
+ byte[] properties = new byte[propertiesLength];
+ byteBuffer.get(properties);
+ String propertiesString = new String(properties, CHARSET_UTF8);
+ Map<String, String> map = string2messageProperties(propertiesString);
+ return map;
+ }
+ return null;
+ }
+
public static MessageExt decode(java.nio.ByteBuffer byteBuffer) {
return decode(byteBuffer, true, true, false);
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/common/src/main/java/org/apache/rocketmq/common/namesrv/TopAddressing.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/namesrv/TopAddressing.java b/common/src/main/java/org/apache/rocketmq/common/namesrv/TopAddressing.java
index 74fd965..990e748 100644
--- a/common/src/main/java/org/apache/rocketmq/common/namesrv/TopAddressing.java
+++ b/common/src/main/java/org/apache/rocketmq/common/namesrv/TopAddressing.java
@@ -88,7 +88,7 @@ public class TopAddressing {
if (verbose) {
String errorMsg =
- "connect to " + url + " failed, maybe the domain name " + MixAll.WS_DOMAIN_NAME + " not bind in /etc/hosts";
+ "connect to " + url + " failed, maybe the domain name " + MixAll.getWSAddr() + " not bind in /etc/hosts";
errorMsg += FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL);
log.warn(errorMsg);
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
index c6b0925..6f132f7 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
@@ -68,6 +68,8 @@ public class RequestCode {
public static final int GET_ALL_DELAY_OFFSET = 45;
+ public static final int CHECK_CLIENT_CONFIG = 46;
+
public static final int PUT_KV_CONFIG = 100;
public static final int GET_KV_CONFIG = 101;
@@ -162,4 +164,6 @@ public class RequestCode {
public static final int SEND_BATCH_MESSAGE = 320;
+
+ public static final int QUERY_CONSUME_QUEUE = 321;
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java b/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java
index 90b182b..f62c4ea 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java
@@ -53,6 +53,10 @@ public class ResponseCode extends RemotingSysResponseCode {
public static final int SUBSCRIPTION_GROUP_NOT_EXIST = 26;
+ public static final int FILTER_DATA_NOT_EXIST = 27;
+
+ public static final int FILTER_DATA_NOT_LATEST = 28;
+
public static final int TRANSACTION_SHOULD_COMMIT = 200;
public static final int TRANSACTION_SHOULD_ROLLBACK = 201;
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/common/src/main/java/org/apache/rocketmq/common/protocol/body/CheckClientRequestBody.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/CheckClientRequestBody.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/CheckClientRequestBody.java
new file mode 100644
index 0000000..a78ce55
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/CheckClientRequestBody.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.protocol.body;
+
+import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+
+public class CheckClientRequestBody extends RemotingSerializable {
+
+ private String clientId;
+ private String group;
+ private SubscriptionData subscriptionData;
+
+ public String getClientId() {
+ return clientId;
+ }
+
+ public void setClientId(String clientId) {
+ this.clientId = clientId;
+ }
+
+ public String getGroup() {
+ return group;
+ }
+
+ public void setGroup(String group) {
+ this.group = group;
+ }
+
+ public SubscriptionData getSubscriptionData() {
+ return subscriptionData;
+ }
+
+ public void setSubscriptionData(SubscriptionData subscriptionData) {
+ this.subscriptionData = subscriptionData;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumeQueueData.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumeQueueData.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumeQueueData.java
new file mode 100644
index 0000000..7268dcd
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumeQueueData.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.protocol.body;
+
+public class ConsumeQueueData {
+
+ private long physicOffset;
+ private int physicSize;
+ private long tagsCode;
+ private String extendDataJson;
+ private String bitMap;
+ private boolean eval;
+ private String msg;
+
+ public long getPhysicOffset() {
+ return physicOffset;
+ }
+
+ public void setPhysicOffset(long physicOffset) {
+ this.physicOffset = physicOffset;
+ }
+
+ public int getPhysicSize() {
+ return physicSize;
+ }
+
+ public void setPhysicSize(int physicSize) {
+ this.physicSize = physicSize;
+ }
+
+ public long getTagsCode() {
+ return tagsCode;
+ }
+
+ public void setTagsCode(long tagsCode) {
+ this.tagsCode = tagsCode;
+ }
+
+ public String getExtendDataJson() {
+ return extendDataJson;
+ }
+
+ public void setExtendDataJson(String extendDataJson) {
+ this.extendDataJson = extendDataJson;
+ }
+
+ public String getBitMap() {
+ return bitMap;
+ }
+
+ public void setBitMap(String bitMap) {
+ this.bitMap = bitMap;
+ }
+
+ public boolean isEval() {
+ return eval;
+ }
+
+ public void setEval(boolean eval) {
+ this.eval = eval;
+ }
+
+ public String getMsg() {
+ return msg;
+ }
+
+ public void setMsg(String msg) {
+ this.msg = msg;
+ }
+
+ @Override
+ public String toString() {
+ return "ConsumeQueueData{" +
+ "physicOffset=" + physicOffset +
+ ", physicSize=" + physicSize +
+ ", tagsCode=" + tagsCode +
+ ", extendDataJson='" + extendDataJson + '\'' +
+ ", bitMap='" + bitMap + '\'' +
+ ", eval=" + eval +
+ ", msg='" + msg + '\'' +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/common/src/main/java/org/apache/rocketmq/common/protocol/body/QueryConsumeQueueResponseBody.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/QueryConsumeQueueResponseBody.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/QueryConsumeQueueResponseBody.java
new file mode 100644
index 0000000..be93da9
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/QueryConsumeQueueResponseBody.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.protocol.body;
+
+import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+
+import java.util.List;
+
+public class QueryConsumeQueueResponseBody extends RemotingSerializable {
+
+ private SubscriptionData subscriptionData;
+ private String filterData;
+ private List<ConsumeQueueData> queueData;
+ private long maxQueueIndex;
+ private long minQueueIndex;
+
+ public SubscriptionData getSubscriptionData() {
+ return subscriptionData;
+ }
+
+ public void setSubscriptionData(SubscriptionData subscriptionData) {
+ this.subscriptionData = subscriptionData;
+ }
+
+ public String getFilterData() {
+ return filterData;
+ }
+
+ public void setFilterData(String filterData) {
+ this.filterData = filterData;
+ }
+
+ public List<ConsumeQueueData> getQueueData() {
+ return queueData;
+ }
+
+ public void setQueueData(List<ConsumeQueueData> queueData) {
+ this.queueData = queueData;
+ }
+
+ public long getMaxQueueIndex() {
+ return maxQueueIndex;
+ }
+
+ public void setMaxQueueIndex(long maxQueueIndex) {
+ this.maxQueueIndex = maxQueueIndex;
+ }
+
+ public long getMinQueueIndex() {
+ return minQueueIndex;
+ }
+
+ public void setMinQueueIndex(long minQueueIndex) {
+ this.minQueueIndex = minQueueIndex;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageRequestHeader.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageRequestHeader.java
index 8a59213..106e89e 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageRequestHeader.java
@@ -46,6 +46,7 @@ public class PullMessageRequestHeader implements CommandCustomHeader {
private String subscription;
@CFNotNull
private Long subVersion;
+ private String expressionType;
@Override
public void checkFields() throws RemotingCommandException {
@@ -130,4 +131,12 @@ public class PullMessageRequestHeader implements CommandCustomHeader {
public void setSubVersion(Long subVersion) {
this.subVersion = subVersion;
}
+
+ public String getExpressionType() {
+ return expressionType;
+ }
+
+ public void setExpressionType(String expressionType) {
+ this.expressionType = expressionType;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/common/src/main/java/org/apache/rocketmq/common/protocol/header/QueryConsumeQueueRequestHeader.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/QueryConsumeQueueRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/QueryConsumeQueueRequestHeader.java
new file mode 100644
index 0000000..642fe17
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/QueryConsumeQueueRequestHeader.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.protocol.header;
+
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+
+public class QueryConsumeQueueRequestHeader implements CommandCustomHeader {
+
+ private String topic;
+ private int queueId;
+ private long index;
+ private int count;
+ private String consumerGroup;
+
+ public String getTopic() {
+ return topic;
+ }
+
+ public void setTopic(String topic) {
+ this.topic = topic;
+ }
+
+ public int getQueueId() {
+ return queueId;
+ }
+
+ public void setQueueId(int queueId) {
+ this.queueId = queueId;
+ }
+
+ public long getIndex() {
+ return index;
+ }
+
+ public void setIndex(long index) {
+ this.index = index;
+ }
+
+ public int getCount() {
+ return count;
+ }
+
+ public void setCount(int count) {
+ this.count = count;
+ }
+
+ public String getConsumerGroup() {
+ return consumerGroup;
+ }
+
+ public void setConsumerGroup(String consumerGroup) {
+ this.consumerGroup = consumerGroup;
+ }
+
+ @Override
+ public void checkFields() throws RemotingCommandException {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/SubscriptionData.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/SubscriptionData.java b/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/SubscriptionData.java
index 81f5954..e456b7e 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/SubscriptionData.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/SubscriptionData.java
@@ -32,6 +32,7 @@ public class SubscriptionData implements Comparable<SubscriptionData> {
private Set<String> tagsSet = new HashSet<String>();
private Set<Integer> codeSet = new HashSet<Integer>();
private long subVersion = System.currentTimeMillis();
+ private String expressionType;
@JSONField(serialize = false)
private String filterClassSource;
@@ -102,6 +103,14 @@ public class SubscriptionData implements Comparable<SubscriptionData> {
this.classFilterMode = classFilterMode;
}
+ public String getExpressionType() {
+ return expressionType;
+ }
+
+ public void setExpressionType(String expressionType) {
+ this.expressionType = expressionType;
+ }
+
@Override
public int hashCode() {
final int prime = 31;
@@ -111,6 +120,7 @@ public class SubscriptionData implements Comparable<SubscriptionData> {
result = prime * result + ((subString == null) ? 0 : subString.hashCode());
result = prime * result + ((tagsSet == null) ? 0 : tagsSet.hashCode());
result = prime * result + ((topic == null) ? 0 : topic.hashCode());
+ result = prime * result + ((expressionType == null) ? 0 : expressionType.hashCode());
return result;
}
@@ -147,6 +157,11 @@ public class SubscriptionData implements Comparable<SubscriptionData> {
return false;
} else if (!topic.equals(other.topic))
return false;
+ if (expressionType == null) {
+ if (other.expressionType != null)
+ return false;
+ } else if (!expressionType.equals(other.expressionType))
+ return false;
return true;
}
@@ -154,7 +169,7 @@ public class SubscriptionData implements Comparable<SubscriptionData> {
public String toString() {
return "SubscriptionData [classFilterMode=" + classFilterMode + ", topic=" + topic + ", subString="
+ subString + ", tagsSet=" + tagsSet + ", codeSet=" + codeSet + ", subVersion=" + subVersion
- + "]";
+ + ", expressionType=" + expressionType + "]";
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/common/src/test/java/org/apache/rocketmq/common/filter/FilterAPITest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/rocketmq/common/filter/FilterAPITest.java b/common/src/test/java/org/apache/rocketmq/common/filter/FilterAPITest.java
index 5137f32..c5f8460 100644
--- a/common/src/test/java/org/apache/rocketmq/common/filter/FilterAPITest.java
+++ b/common/src/test/java/org/apache/rocketmq/common/filter/FilterAPITest.java
@@ -42,4 +42,53 @@ public class FilterAPITest {
}
assertThat(subscriptionData.getTagsSet()).isEqualTo(tagSet);
}
+
+ @Test
+ public void testBuildTagSome() {
+ try {
+ SubscriptionData subscriptionData = FilterAPI.build(
+ "TOPIC", "A || B", ExpressionType.TAG
+ );
+
+ assertThat(subscriptionData).isNotNull();
+ assertThat(subscriptionData.getTopic()).isEqualTo("TOPIC");
+ assertThat(subscriptionData.getSubString()).isEqualTo("A || B");
+ assertThat(ExpressionType.isTagType(subscriptionData.getExpressionType())).isTrue();
+
+ assertThat(subscriptionData.getTagsSet()).isNotNull();
+ assertThat(subscriptionData.getTagsSet()).containsExactly("A", "B");
+ } catch (Exception e) {
+ e.printStackTrace();
+ assertThat(Boolean.FALSE).isTrue();
+ }
+ }
+
+ @Test
+ public void testBuildSQL() {
+ try {
+ SubscriptionData subscriptionData = FilterAPI.build(
+ "TOPIC", "a is not null", ExpressionType.SQL92
+ );
+
+ assertThat(subscriptionData).isNotNull();
+ assertThat(subscriptionData.getTopic()).isEqualTo("TOPIC");
+ assertThat(subscriptionData.getExpressionType()).isEqualTo(ExpressionType.SQL92);
+ } catch (Exception e) {
+ e.printStackTrace();
+ assertThat(Boolean.FALSE).isTrue();
+ }
+ }
+
+ @Test
+ public void testBuildSQLWithNullSubString() {
+ try {
+ FilterAPI.build(
+ "TOPIC", null, ExpressionType.SQL92
+ );
+
+ assertThat(Boolean.FALSE).isTrue();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/common/src/test/java/org/apache/rocketmq/common/message/MessageDecoderTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/rocketmq/common/message/MessageDecoderTest.java b/common/src/test/java/org/apache/rocketmq/common/message/MessageDecoderTest.java
new file mode 100644
index 0000000..d14d6b0
--- /dev/null
+++ b/common/src/test/java/org/apache/rocketmq/common/message/MessageDecoderTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.message;
+
+import org.junit.Test;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class MessageDecoderTest {
+
+ @Test
+ public void testDecodeProperties() {
+ MessageExt messageExt = new MessageExt();
+
+ messageExt.setMsgId("645100FA00002A9F000000489A3AA09E");
+ messageExt.setTopic("abc");
+ messageExt.setBody("hello!q!".getBytes());
+ try {
+ messageExt.setBornHost(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 0));
+ } catch (UnknownHostException e) {
+ e.printStackTrace();
+ assertThat(Boolean.FALSE).isTrue();
+ }
+ messageExt.setBornTimestamp(System.currentTimeMillis());
+ messageExt.setCommitLogOffset(123456);
+ messageExt.setPreparedTransactionOffset(0);
+ messageExt.setQueueId(0);
+ messageExt.setQueueOffset(123);
+ messageExt.setReconsumeTimes(0);
+ try {
+ messageExt.setStoreHost(new InetSocketAddress(InetAddress.getLocalHost(), 0));
+ } catch (UnknownHostException e) {
+ e.printStackTrace();
+ assertThat(Boolean.FALSE).isTrue();
+ }
+
+ messageExt.putUserProperty("a", "123");
+ messageExt.putUserProperty("b", "hello");
+ messageExt.putUserProperty("c", "3.14");
+
+ byte[] msgBytes = new byte[0];
+ try {
+ msgBytes = MessageDecoder.encode(messageExt, false);
+ } catch (Exception e) {
+ e.printStackTrace();
+ assertThat(Boolean.FALSE).isTrue();
+ }
+
+ ByteBuffer byteBuffer = ByteBuffer.allocate(msgBytes.length);
+ byteBuffer.put(msgBytes);
+
+ Map<String, String> properties = MessageDecoder.decodeProperties(byteBuffer);
+
+ assertThat(properties).isNotNull();
+ assertThat("123").isEqualTo(properties.get("a"));
+ assertThat("hello").isEqualTo(properties.get("b"));
+ assertThat("3.14").isEqualTo(properties.get("c"));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/distribution/conf/logback_broker.xml
----------------------------------------------------------------------
diff --git a/distribution/conf/logback_broker.xml b/distribution/conf/logback_broker.xml
index 05c0ee4..dd5c63f 100644
--- a/distribution/conf/logback_broker.xml
+++ b/distribution/conf/logback_broker.xml
@@ -222,6 +222,29 @@
<appender-ref ref="RocketmqRebalanceLockAppender_inner"/>
</appender>
+ <appender name="RocketmqFilterAppender_inner"
+ class="ch.qos.logback.core.rolling.RollingFileAppender">
+ <file>${user.home}/logs/rocketmqlogs/filter.log</file>
+ <append>true</append>
+ <rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
+ <fileNamePattern>${user.home}/logs/rocketmqlogs/otherdays/filter.%i.log
+ </fileNamePattern>
+ <minIndex>1</minIndex>
+ <maxIndex>10</maxIndex>
+ </rollingPolicy>
+ <triggeringPolicy
+ class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
+ <maxFileSize>100MB</maxFileSize>
+ </triggeringPolicy>
+ <encoder>
+ <pattern>%d{yyy-MM-dd HH:mm:ss,GMT+8} %p %t - %m%n</pattern>
+ <charset class="java.nio.charset.Charset">UTF-8</charset>
+ </encoder>
+ </appender>
+ <appender name="RocketmqFilterAppender" class="ch.qos.logback.classic.AsyncAppender">
+ <appender-ref ref="RocketmqFilterAppender_inner"/>
+ </appender>
+
<appender name="RocketmqStatsAppender"
class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${user.home}/logs/rocketmqlogs/stats.log</file>
@@ -321,6 +344,11 @@
<appender-ref ref="RocketmqCommercialAppender"/>
</logger>
+ <logger name="RocketmqFilter" additivity="false">
+ <level value="INFO"/>
+ <appender-ref ref="RocketmqFilterAppender"/>
+ </logger>
+
<root>
<level value="INFO"/>
<appender-ref ref="DefaultAppender"/>
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/distribution/release.xml
----------------------------------------------------------------------
diff --git a/distribution/release.xml b/distribution/release.xml
index 2d3ec1e..9e4ef2a 100644
--- a/distribution/release.xml
+++ b/distribution/release.xml
@@ -67,6 +67,7 @@
<include>org.apache.rocketmq:rocketmq-namesrv</include>
<include>org.apache.rocketmq:rocketmq-filtersrv</include>
<include>org.apache.rocketmq:rocketmq-example</include>
+ <include>org.apache.rocketmq:rocketmq-filter</include>
</includes>
<binaries>
<outputDirectory>lib/</outputDirectory>
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java
----------------------------------------------------------------------
diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java
index 473e4c7..3e1b79b 100644
--- a/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java
@@ -27,10 +27,13 @@ import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.filter.ExpressionType;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.srvutil.ServerUtil;
@@ -46,12 +49,14 @@ public class Consumer {
final String topic = commandLine.hasOption('t') ? commandLine.getOptionValue('t').trim() : "BenchmarkTest";
final String groupPrefix = commandLine.hasOption('g') ? commandLine.getOptionValue('g').trim() : "benchmark_consumer";
final String isPrefixEnable = commandLine.hasOption('p') ? commandLine.getOptionValue('p').trim() : "true";
+ final String filterType = commandLine.hasOption('f') ? commandLine.getOptionValue('f').trim() : null;
+ final String expression = commandLine.hasOption('e') ? commandLine.getOptionValue('e').trim() : null;
String group = groupPrefix;
if (Boolean.parseBoolean(isPrefixEnable)) {
group = groupPrefix + "_" + Long.toString(System.currentTimeMillis() % 100);
}
- System.out.printf("topic %s group %s prefix %s%n", topic, group, isPrefixEnable);
+ System.out.printf("topic: %s, group: %s, prefix: %s, filterType: %s, expression: %s%n", topic, group, isPrefixEnable, filterType, expression);
final StatsBenchmarkConsumer statsBenchmarkConsumer = new StatsBenchmarkConsumer();
@@ -99,7 +104,21 @@ public class Consumer {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group);
consumer.setInstanceName(Long.toString(System.currentTimeMillis()));
- consumer.subscribe(topic, "*");
+ if (filterType == null || expression == null) {
+ consumer.subscribe(topic, "*");
+ } else {
+ if (ExpressionType.TAG.equals(filterType)) {
+ String expr = MixAll.file2String(expression);
+ System.out.printf("Expression: %s%n", expr);
+ consumer.subscribe(topic, MessageSelector.byTag(expr));
+ } else if (ExpressionType.SQL92.equals(filterType)) {
+ String expr = MixAll.file2String(expression);
+ System.out.printf("Expression: %s%n", expr);
+ consumer.subscribe(topic, MessageSelector.bySql(expr));
+ } else {
+ throw new IllegalArgumentException("Not support filter type! " + filterType);
+ }
+ }
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
@@ -142,6 +161,14 @@ public class Consumer {
opt.setRequired(false);
options.addOption(opt);
+ opt = new Option("f", "filterType", true, "TAG, SQL92");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ opt = new Option("e", "expression", true, "filter expression content file path.ie: ./test/expr");
+ opt.setRequired(false);
+ options.addOption(opt);
+
return options;
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java
----------------------------------------------------------------------
diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java
index 50d750d..2d8d0f6 100644
--- a/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java
@@ -18,11 +18,13 @@ package org.apache.rocketmq.example.benchmark;
import java.io.UnsupportedEncodingException;
import java.util.LinkedList;
+import java.util.Random;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
+
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
@@ -50,13 +52,12 @@ public class Producer {
final int threadCount = commandLine.hasOption('w') ? Integer.parseInt(commandLine.getOptionValue('w')) : 64;
final int messageSize = commandLine.hasOption('s') ? Integer.parseInt(commandLine.getOptionValue('s')) : 128;
final boolean keyEnable = commandLine.hasOption('k') && Boolean.parseBoolean(commandLine.getOptionValue('k'));
+ final int propertySize = commandLine.hasOption('p') ? Integer.parseInt(commandLine.getOptionValue('p')) : 0;
System.out.printf("topic %s threadCount %d messageSize %d keyEnable %s%n", topic, threadCount, messageSize, keyEnable);
final Logger log = ClientLogger.getLog();
- final Message msg = buildMessage(messageSize, topic);
-
final ExecutorService sendThreadPool = Executors.newFixedThreadPool(threadCount);
final StatsBenchmarkProducer statsBenchmark = new StatsBenchmarkProducer();
@@ -117,10 +118,37 @@ public class Producer {
public void run() {
while (true) {
try {
+ final Message msg;
+ try {
+ msg = buildMessage(messageSize, topic);
+ } catch (UnsupportedEncodingException e) {
+ e.printStackTrace();
+ return;
+ }
final long beginTimestamp = System.currentTimeMillis();
if (keyEnable) {
msg.setKeys(String.valueOf(beginTimestamp / 1000));
}
+ if (propertySize > 0) {
+ if (msg.getProperties() != null) {
+ msg.getProperties().clear();
+ }
+ int i = 0;
+ int startValue = (new Random(System.currentTimeMillis())).nextInt(100);
+ int size = 0;
+ while (true) {
+ String prop1 = "prop" + i, prop1V = "hello" + startValue;
+ String prop2 = "prop" + (i + 1), prop2V = String.valueOf(startValue);
+ msg.putUserProperty(prop1, prop1V);
+ msg.putUserProperty(prop2, prop2V);
+ size += prop1.length() + prop2.length() + prop1V.length() + prop2V.length();
+ if (size > propertySize) {
+ break;
+ }
+ i += 2;
+ startValue += 2;
+ }
+ }
producer.send(msg);
statsBenchmark.getSendRequestSuccessCount().incrementAndGet();
statsBenchmark.getReceiveResponseSuccessCount().incrementAndGet();
@@ -214,7 +242,7 @@ class StatsBenchmarkProducer {
private final AtomicLong sendMessageMaxRT = new AtomicLong(0L);
public Long[] createSnapshot() {
- Long[] snap = new Long[] {
+ Long[] snap = new Long[]{
System.currentTimeMillis(),
this.sendRequestSuccessCount.get(),
this.sendRequestFailedCount.get(),
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/example/src/main/java/org/apache/rocketmq/example/filter/SqlConsumer.java
----------------------------------------------------------------------
diff --git a/example/src/main/java/org/apache/rocketmq/example/filter/SqlConsumer.java b/example/src/main/java/org/apache/rocketmq/example/filter/SqlConsumer.java
new file mode 100644
index 0000000..9a3b813
--- /dev/null
+++ b/example/src/main/java/org/apache/rocketmq/example/filter/SqlConsumer.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.example.filter;
+
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.MessageSelector;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.message.MessageExt;
+
+import java.util.List;
+
+public class SqlConsumer {
+
+ public static void main(String[] args) {
+ DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
+
+ try {
+ consumer.subscribe("TopicTest",
+ MessageSelector.bySql("(TAGS is not null and TAGS in ('TagA', 'TagB'))" +
+ "and (a is not null and a between 0 3)"));
+ } catch (MQClientException e) {
+ e.printStackTrace();
+ return;
+ }
+
+ consumer.registerMessageListener(new MessageListenerConcurrently() {
+
+ @Override
+ public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
+ ConsumeConcurrentlyContext context) {
+ System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
+ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+ }
+ });
+
+ try {
+ consumer.start();
+ } catch (MQClientException e) {
+ e.printStackTrace();
+ return;
+ }
+ System.out.printf("Consumer Started.%n");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/example/src/main/java/org/apache/rocketmq/example/filter/SqlProducer.java
----------------------------------------------------------------------
diff --git a/example/src/main/java/org/apache/rocketmq/example/filter/SqlProducer.java b/example/src/main/java/org/apache/rocketmq/example/filter/SqlProducer.java
new file mode 100644
index 0000000..3f3a0e6
--- /dev/null
+++ b/example/src/main/java/org/apache/rocketmq/example/filter/SqlProducer.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.example.filter;
+
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+
+public class SqlProducer {
+
+ public static void main(String[] args) {
+ DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
+ try {
+ producer.start();
+ } catch (MQClientException e) {
+ e.printStackTrace();
+ return;
+ }
+
+ for (int i = 0; i < 10; i++) {
+ try {
+ String tag;
+ int div = i % 3;
+ if (div == 0) {
+ tag = "TagA";
+ } else if (div == 1) {
+ tag = "TagB";
+ } else {
+ tag = "TagC";
+ }
+ Message msg = new Message("TopicTest",
+ tag,
+ ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
+ );
+ msg.putUserProperty("a", String.valueOf(i));
+
+ SendResult sendResult = producer.send(msg);
+ System.out.printf("%s%n", sendResult);
+ } catch (Exception e) {
+ e.printStackTrace();
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e1) {
+ e1.printStackTrace();
+ }
+ }
+ }
+ producer.shutdown();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/filter/pom.xml
----------------------------------------------------------------------
diff --git a/filter/pom.xml b/filter/pom.xml
new file mode 100644
index 0000000..7978f05
--- /dev/null
+++ b/filter/pom.xml
@@ -0,0 +1,43 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>rocketmq-all</artifactId>
+ <groupId>org.apache.rocketmq</groupId>
+ <version>4.1.0-incubating-SNAPSHOT</version>
+ </parent>
+
+ <modelVersion>4.0.0</modelVersion>
+ <packaging>jar</packaging>
+ <artifactId>rocketmq-filter</artifactId>
+ <name>rocketmq-filter ${project.version}</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>rocketmq-common</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>rocketmq-srvutil</artifactId>
+ </dependency>
+ </dependencies>
+</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/filter/src/main/java/org/apache/rocketmq/filter/FilterFactory.java
----------------------------------------------------------------------
diff --git a/filter/src/main/java/org/apache/rocketmq/filter/FilterFactory.java b/filter/src/main/java/org/apache/rocketmq/filter/FilterFactory.java
new file mode 100644
index 0000000..a318548
--- /dev/null
+++ b/filter/src/main/java/org/apache/rocketmq/filter/FilterFactory.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.filter;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Filter factory: support other filter to register.
+ */
+public class FilterFactory {
+
+ public static final FilterFactory INSTANCE = new FilterFactory();
+
+ protected static final Map<String, FilterSpi> FILTER_SPI_HOLDER = new HashMap<String, FilterSpi>(4);
+
+ static {
+ FilterFactory.INSTANCE.register(new SqlFilter());
+ }
+
+ /**
+ * Register a filter.
+ * <br>
+ * Note:
+ * <li>1. Filter registered will be used in broker server, so take care of it's reliability and performance.</li>
+ *
+ * @param filterSpi
+ */
+ public void register(FilterSpi filterSpi) {
+ if (FILTER_SPI_HOLDER.containsKey(filterSpi.ofType())) {
+ throw new IllegalArgumentException(String.format("Filter spi type(%s) already exist!", filterSpi.ofType()));
+ }
+
+ FILTER_SPI_HOLDER.put(filterSpi.ofType(), filterSpi);
+ }
+
+ /**
+ * Un register a filter.
+ *
+ * @param type
+ * @return
+ */
+ public FilterSpi unRegister(String type) {
+ return FILTER_SPI_HOLDER.remove(type);
+ }
+
+ /**
+ * Get a filter registered, null if none exist.
+ *
+ * @param type
+ * @return
+ */
+ public FilterSpi get(String type) {
+ return FILTER_SPI_HOLDER.get(type);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/filter/src/main/java/org/apache/rocketmq/filter/FilterSpi.java
----------------------------------------------------------------------
diff --git a/filter/src/main/java/org/apache/rocketmq/filter/FilterSpi.java b/filter/src/main/java/org/apache/rocketmq/filter/FilterSpi.java
new file mode 100644
index 0000000..fcc39fa
--- /dev/null
+++ b/filter/src/main/java/org/apache/rocketmq/filter/FilterSpi.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.filter;
+
+import org.apache.rocketmq.filter.expression.Expression;
+import org.apache.rocketmq.filter.expression.MQFilterException;
+
+/**
+ * Filter spi interface.
+ */
+public interface FilterSpi {
+
+ /**
+ * Compile.
+ *
+ * @param expr
+ * @return
+ * @throws org.apache.rocketmq.filter.expression.MQFilterException
+ */
+ Expression compile(final String expr) throws MQFilterException;
+
+ /**
+ * Which type.
+ *
+ * @return
+ */
+ String ofType();
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/filter/src/main/java/org/apache/rocketmq/filter/SqlFilter.java
----------------------------------------------------------------------
diff --git a/filter/src/main/java/org/apache/rocketmq/filter/SqlFilter.java b/filter/src/main/java/org/apache/rocketmq/filter/SqlFilter.java
new file mode 100644
index 0000000..0c1ffb8
--- /dev/null
+++ b/filter/src/main/java/org/apache/rocketmq/filter/SqlFilter.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.filter;
+
+import org.apache.rocketmq.common.filter.ExpressionType;
+import org.apache.rocketmq.filter.expression.Expression;
+import org.apache.rocketmq.filter.expression.MQFilterException;
+import org.apache.rocketmq.filter.parser.SelectorParser;
+
+/**
+ * SQL92 Filter, just a wrapper of {@link org.apache.rocketmq.filter.parser.SelectorParser}.
+ * <p/>
+ * <p>
+ * Do not use this filter directly.Use {@link FilterFactory#get} to select a filter.
+ * </p>
+ */
+public class SqlFilter implements FilterSpi {
+
+ @Override
+ public Expression compile(final String expr) throws MQFilterException {
+ return SelectorParser.parse(expr);
+ }
+
+ @Override
+ public String ofType() {
+ return ExpressionType.SQL92;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/filter/src/main/java/org/apache/rocketmq/filter/constant/UnaryType.java
----------------------------------------------------------------------
diff --git a/filter/src/main/java/org/apache/rocketmq/filter/constant/UnaryType.java b/filter/src/main/java/org/apache/rocketmq/filter/constant/UnaryType.java
new file mode 100644
index 0000000..d2d04cd
--- /dev/null
+++ b/filter/src/main/java/org/apache/rocketmq/filter/constant/UnaryType.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.filter.constant;
+
+public enum UnaryType {
+ NEGATE,
+ IN,
+ NOT,
+ BOOLEANCAST,
+ LIKE
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/filter/src/main/java/org/apache/rocketmq/filter/expression/BinaryExpression.java
----------------------------------------------------------------------
diff --git a/filter/src/main/java/org/apache/rocketmq/filter/expression/BinaryExpression.java b/filter/src/main/java/org/apache/rocketmq/filter/expression/BinaryExpression.java
new file mode 100644
index 0000000..0f172e3
--- /dev/null
+++ b/filter/src/main/java/org/apache/rocketmq/filter/expression/BinaryExpression.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.filter.expression;
+
+/**
+ * An expression which performs an operation on two expression values.
+ * <p>
+ * This class was taken from ActiveMQ org.apache.activemq.filter.BinaryExpression,
+ * </p>
+ */
+public abstract class BinaryExpression implements Expression {
+ protected Expression left;
+ protected Expression right;
+
+ public BinaryExpression(Expression left, Expression right) {
+ this.left = left;
+ this.right = right;
+ }
+
+ public Expression getLeft() {
+ return left;
+ }
+
+ public Expression getRight() {
+ return right;
+ }
+
+ /**
+ * @see Object#toString()
+ */
+ public String toString() {
+ return "(" + left.toString() + " " + getExpressionSymbol() + " " + right.toString() + ")";
+ }
+
+ /**
+ * @see Object#hashCode()
+ */
+ public int hashCode() {
+ return toString().hashCode();
+ }
+
+ /**
+ * @see Object#equals(Object)
+ */
+ public boolean equals(Object o) {
+
+ if (o == null || !this.getClass().equals(o.getClass())) {
+ return false;
+ }
+ return toString().equals(o.toString());
+
+ }
+
+ /**
+ * Returns the symbol that represents this binary expression. For example, addition is
+ * represented by "+"
+ *
+ * @return
+ */
+ public abstract String getExpressionSymbol();
+
+ /**
+ * @param expression
+ */
+ public void setRight(Expression expression) {
+ right = expression;
+ }
+
+ /**
+ * @param expression
+ */
+ public void setLeft(Expression expression) {
+ left = expression;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/filter/src/main/java/org/apache/rocketmq/filter/expression/BooleanExpression.java
----------------------------------------------------------------------
diff --git a/filter/src/main/java/org/apache/rocketmq/filter/expression/BooleanExpression.java b/filter/src/main/java/org/apache/rocketmq/filter/expression/BooleanExpression.java
new file mode 100644
index 0000000..bb54632
--- /dev/null
+++ b/filter/src/main/java/org/apache/rocketmq/filter/expression/BooleanExpression.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.filter.expression;
+
+/**
+ * A BooleanExpression is an expression that always
+ * produces a Boolean result.
+ * <p>
+ * This class was taken from ActiveMQ org.apache.activemq.filter.BooleanExpression,
+ * but the parameter is changed to an interface.
+ * </p>
+ *
+ * @see org.apache.rocketmq.filter.expression.EvaluationContext
+ */
+public interface BooleanExpression extends Expression {
+
+ /**
+ * @param context
+ * @return true if the expression evaluates to Boolean.TRUE.
+ * @throws Exception
+ */
+ boolean matches(EvaluationContext context) throws Exception;
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/filter/src/main/java/org/apache/rocketmq/filter/expression/ComparisonExpression.java
----------------------------------------------------------------------
diff --git a/filter/src/main/java/org/apache/rocketmq/filter/expression/ComparisonExpression.java b/filter/src/main/java/org/apache/rocketmq/filter/expression/ComparisonExpression.java
new file mode 100644
index 0000000..8b82e57
--- /dev/null
+++ b/filter/src/main/java/org/apache/rocketmq/filter/expression/ComparisonExpression.java
@@ -0,0 +1,413 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.filter.expression;
+
+import java.util.List;
+
+/**
+ * A filter performing a comparison of two objects
+ * <p>
+ * This class was taken from ActiveMQ org.apache.activemq.filter.ComparisonExpression,
+ * but:
+ * 1. Remove LIKE expression, and related methods;
+ * 2. Extract a new method __compare which has int return value;
+ * 3. When create between expression, check whether left value is less or equal than right value;
+ * 4. For string type value(can not convert to number), only equal or unequal comparison are supported.
+ * </p>
+ */
+public abstract class ComparisonExpression extends BinaryExpression implements BooleanExpression {
+
+ public static final ThreadLocal<Boolean> CONVERT_STRING_EXPRESSIONS = new ThreadLocal<Boolean>();
+
+ boolean convertStringExpressions = false;
+
+ /**
+ * @param left
+ * @param right
+ */
+ public ComparisonExpression(Expression left, Expression right) {
+ super(left, right);
+ convertStringExpressions = CONVERT_STRING_EXPRESSIONS.get() != null;
+ }
+
+ public static BooleanExpression createBetween(Expression value, Expression left, Expression right) {
+ // check
+ if (left instanceof ConstantExpression && right instanceof ConstantExpression) {
+ Object lv = ((ConstantExpression) left).getValue();
+ Object rv = ((ConstantExpression) right).getValue();
+ if (lv == null || rv == null) {
+ throw new RuntimeException("Illegal values of between, values can not be null!");
+ }
+ if (lv instanceof Comparable && rv instanceof Comparable) {
+ int ret = __compare((Comparable) rv, (Comparable) lv, true);
+ if (ret < 0)
+ throw new RuntimeException(
+ String.format("Illegal values of between, left value(%s) must less than or equal to right value(%s)", lv, rv)
+ );
+ }
+ }
+
+ return LogicExpression.createAND(createGreaterThanEqual(value, left), createLessThanEqual(value, right));
+ }
+
+ public static BooleanExpression createNotBetween(Expression value, Expression left, Expression right) {
+ return LogicExpression.createOR(createLessThan(value, left), createGreaterThan(value, right));
+ }
+
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ public static BooleanExpression createInFilter(Expression left, List elements) {
+
+ if (!(left instanceof PropertyExpression)) {
+ throw new RuntimeException("Expected a property for In expression, got: " + left);
+ }
+ return UnaryExpression.createInExpression((PropertyExpression) left, elements, false);
+
+ }
+
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ public static BooleanExpression createNotInFilter(Expression left, List elements) {
+
+ if (!(left instanceof PropertyExpression)) {
+ throw new RuntimeException("Expected a property for In expression, got: " + left);
+ }
+ return UnaryExpression.createInExpression((PropertyExpression) left, elements, true);
+
+ }
+
+ public static BooleanExpression createIsNull(Expression left) {
+ return doCreateEqual(left, ConstantExpression.NULL);
+ }
+
+ public static BooleanExpression createIsNotNull(Expression left) {
+ return UnaryExpression.createNOT(doCreateEqual(left, ConstantExpression.NULL));
+ }
+
+ public static BooleanExpression createNotEqual(Expression left, Expression right) {
+ return UnaryExpression.createNOT(createEqual(left, right));
+ }
+
+ public static BooleanExpression createEqual(Expression left, Expression right) {
+ checkEqualOperand(left);
+ checkEqualOperand(right);
+ checkEqualOperandCompatability(left, right);
+ return doCreateEqual(left, right);
+ }
+
+ @SuppressWarnings({"rawtypes"})
+ private static BooleanExpression doCreateEqual(Expression left, Expression right) {
+ return new ComparisonExpression(left, right) {
+
+ public Object evaluate(EvaluationContext context) throws Exception {
+ Object lv = left.evaluate(context);
+ Object rv = right.evaluate(context);
+
+ // If one of the values is null
+ if (lv == null ^ rv == null) {
+ if (lv == null) {
+ return null;
+ }
+ return Boolean.FALSE;
+ }
+ if (lv == rv || lv.equals(rv)) {
+ return Boolean.TRUE;
+ }
+ if (lv instanceof Comparable && rv instanceof Comparable) {
+ return compare((Comparable) lv, (Comparable) rv);
+ }
+ return Boolean.FALSE;
+ }
+
+ protected boolean asBoolean(int answer) {
+ return answer == 0;
+ }
+
+ public String getExpressionSymbol() {
+ return "==";
+ }
+ };
+ }
+
+ public static BooleanExpression createGreaterThan(final Expression left, final Expression right) {
+ checkLessThanOperand(left);
+ checkLessThanOperand(right);
+ return new ComparisonExpression(left, right) {
+ protected boolean asBoolean(int answer) {
+ return answer > 0;
+ }
+
+ public String getExpressionSymbol() {
+ return ">";
+ }
+ };
+ }
+
+ public static BooleanExpression createGreaterThanEqual(final Expression left, final Expression right) {
+ checkLessThanOperand(left);
+ checkLessThanOperand(right);
+ return new ComparisonExpression(left, right) {
+ protected boolean asBoolean(int answer) {
+ return answer >= 0;
+ }
+
+ public String getExpressionSymbol() {
+ return ">=";
+ }
+ };
+ }
+
+ public static BooleanExpression createLessThan(final Expression left, final Expression right) {
+ checkLessThanOperand(left);
+ checkLessThanOperand(right);
+ return new ComparisonExpression(left, right) {
+
+ protected boolean asBoolean(int answer) {
+ return answer < 0;
+ }
+
+ public String getExpressionSymbol() {
+ return "<";
+ }
+
+ };
+ }
+
+ public static BooleanExpression createLessThanEqual(final Expression left, final Expression right) {
+ checkLessThanOperand(left);
+ checkLessThanOperand(right);
+ return new ComparisonExpression(left, right) {
+
+ protected boolean asBoolean(int answer) {
+ return answer <= 0;
+ }
+
+ public String getExpressionSymbol() {
+ return "<=";
+ }
+ };
+ }
+
+ /**
+ * Only Numeric expressions can be used in >, >=, < or <= expressions.s
+ *
+ * @param expr
+ */
+ public static void checkLessThanOperand(Expression expr) {
+ if (expr instanceof ConstantExpression) {
+ Object value = ((ConstantExpression) expr).getValue();
+ if (value instanceof Number) {
+ return;
+ }
+
+ // Else it's boolean or a String..
+ throw new RuntimeException("Value '" + expr + "' cannot be compared.");
+ }
+ if (expr instanceof BooleanExpression) {
+ throw new RuntimeException("Value '" + expr + "' cannot be compared.");
+ }
+ }
+
+ /**
+ * Validates that the expression can be used in == or <> expression. Cannot
+ * not be NULL TRUE or FALSE litterals.
+ *
+ * @param expr
+ */
+ public static void checkEqualOperand(Expression expr) {
+ if (expr instanceof ConstantExpression) {
+ Object value = ((ConstantExpression) expr).getValue();
+ if (value == null) {
+ throw new RuntimeException("'" + expr + "' cannot be compared.");
+ }
+ }
+ }
+
+ /**
+ * @param left
+ * @param right
+ */
+ private static void checkEqualOperandCompatability(Expression left, Expression right) {
+ if (left instanceof ConstantExpression && right instanceof ConstantExpression) {
+ if (left instanceof BooleanExpression && !(right instanceof BooleanExpression)) {
+ throw new RuntimeException("'" + left + "' cannot be compared with '" + right + "'");
+ }
+ }
+ }
+
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ public Object evaluate(EvaluationContext context) throws Exception {
+ Comparable<Comparable> lv = (Comparable) left.evaluate(context);
+ if (lv == null) {
+ return null;
+ }
+ Comparable rv = (Comparable) right.evaluate(context);
+ if (rv == null) {
+ return null;
+ }
+ if (getExpressionSymbol().equals(">=") || getExpressionSymbol().equals(">")
+ || getExpressionSymbol().equals("<") || getExpressionSymbol().equals("<=")) {
+ Class<? extends Comparable> lc = lv.getClass();
+ Class<? extends Comparable> rc = rv.getClass();
+ if (lc == rc && lc == String.class) {
+ // Compare String is illegal
+ // first try to convert to double
+ try {
+ Comparable lvC = Double.valueOf((String) (Comparable) lv);
+ Comparable rvC = Double.valueOf((String) rv);
+
+ return compare(lvC, rvC);
+ } catch (Exception e) {
+ throw new RuntimeException("It's illegal to compare string by '>=', '>', '<', '<='. lv=" + lv + ", rv=" + rv, e);
+ }
+ }
+ }
+ return compare(lv, rv);
+ }
+
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ protected static int __compare(Comparable lv, Comparable rv, boolean convertStringExpressions) {
+ Class<? extends Comparable> lc = lv.getClass();
+ Class<? extends Comparable> rc = rv.getClass();
+ // If the the objects are not of the same type,
+ // try to convert up to allow the comparison.
+ if (lc != rc) {
+ try {
+ if (lc == Boolean.class) {
+ if (convertStringExpressions && rc == String.class) {
+ lv = Boolean.valueOf((String) lv).booleanValue();
+ } else {
+ return -1;
+ }
+ } else if (lc == Byte.class) {
+ if (rc == Short.class) {
+ lv = Short.valueOf(((Number) lv).shortValue());
+ } else if (rc == Integer.class) {
+ lv = Integer.valueOf(((Number) lv).intValue());
+ } else if (rc == Long.class) {
+ lv = Long.valueOf(((Number) lv).longValue());
+ } else if (rc == Float.class) {
+ lv = new Float(((Number) lv).floatValue());
+ } else if (rc == Double.class) {
+ lv = new Double(((Number) lv).doubleValue());
+ } else if (convertStringExpressions && rc == String.class) {
+ rv = Byte.valueOf((String) rv);
+ } else {
+ return -1;
+ }
+ } else if (lc == Short.class) {
+ if (rc == Integer.class) {
+ lv = Integer.valueOf(((Number) lv).intValue());
+ } else if (rc == Long.class) {
+ lv = Long.valueOf(((Number) lv).longValue());
+ } else if (rc == Float.class) {
+ lv = new Float(((Number) lv).floatValue());
+ } else if (rc == Double.class) {
+ lv = new Double(((Number) lv).doubleValue());
+ } else if (convertStringExpressions && rc == String.class) {
+ rv = Short.valueOf((String) rv);
+ } else {
+ return -1;
+ }
+ } else if (lc == Integer.class) {
+ if (rc == Long.class) {
+ lv = Long.valueOf(((Number) lv).longValue());
+ } else if (rc == Float.class) {
+ lv = new Float(((Number) lv).floatValue());
+ } else if (rc == Double.class) {
+ lv = new Double(((Number) lv).doubleValue());
+ } else if (convertStringExpressions && rc == String.class) {
+ rv = Integer.valueOf((String) rv);
+ } else {
+ return -1;
+ }
+ } else if (lc == Long.class) {
+ if (rc == Integer.class) {
+ rv = Long.valueOf(((Number) rv).longValue());
+ } else if (rc == Float.class) {
+ lv = new Float(((Number) lv).floatValue());
+ } else if (rc == Double.class) {
+ lv = new Double(((Number) lv).doubleValue());
+ } else if (convertStringExpressions && rc == String.class) {
+ rv = Long.valueOf((String) rv);
+ } else {
+ return -1;
+ }
+ } else if (lc == Float.class) {
+ if (rc == Integer.class) {
+ rv = new Float(((Number) rv).floatValue());
+ } else if (rc == Long.class) {
+ rv = new Float(((Number) rv).floatValue());
+ } else if (rc == Double.class) {
+ lv = new Double(((Number) lv).doubleValue());
+ } else if (convertStringExpressions && rc == String.class) {
+ rv = Float.valueOf((String) rv);
+ } else {
+ return -1;
+ }
+ } else if (lc == Double.class) {
+ if (rc == Integer.class) {
+ rv = new Double(((Number) rv).doubleValue());
+ } else if (rc == Long.class) {
+ rv = new Double(((Number) rv).doubleValue());
+ } else if (rc == Float.class) {
+ rv = new Float(((Number) rv).doubleValue());
+ } else if (convertStringExpressions && rc == String.class) {
+ rv = Double.valueOf((String) rv);
+ } else {
+ return -1;
+ }
+ } else if (convertStringExpressions && lc == String.class) {
+ if (rc == Boolean.class) {
+ lv = Boolean.valueOf((String) lv);
+ } else if (rc == Byte.class) {
+ lv = Byte.valueOf((String) lv);
+ } else if (rc == Short.class) {
+ lv = Short.valueOf((String) lv);
+ } else if (rc == Integer.class) {
+ lv = Integer.valueOf((String) lv);
+ } else if (rc == Long.class) {
+ lv = Long.valueOf((String) lv);
+ } else if (rc == Float.class) {
+ lv = Float.valueOf((String) lv);
+ } else if (rc == Double.class) {
+ lv = Double.valueOf((String) lv);
+ } else {
+ return -1;
+ }
+ } else {
+ return -1;
+ }
+ } catch (NumberFormatException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ return lv.compareTo(rv);
+ }
+
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ protected Boolean compare(Comparable lv, Comparable rv) {
+ return asBoolean(__compare(lv, rv, convertStringExpressions)) ? Boolean.TRUE : Boolean.FALSE;
+ }
+
+ protected abstract boolean asBoolean(int answer);
+
+ public boolean matches(EvaluationContext context) throws Exception {
+ Object object = evaluate(context);
+ return object != null && object == Boolean.TRUE;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/filter/src/main/java/org/apache/rocketmq/filter/expression/ConstantExpression.java
----------------------------------------------------------------------
diff --git a/filter/src/main/java/org/apache/rocketmq/filter/expression/ConstantExpression.java b/filter/src/main/java/org/apache/rocketmq/filter/expression/ConstantExpression.java
new file mode 100644
index 0000000..ca70f51
--- /dev/null
+++ b/filter/src/main/java/org/apache/rocketmq/filter/expression/ConstantExpression.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.filter.expression;
+
+/**
+ * Represents a constant expression
+ * <p>
+ * This class was taken from ActiveMQ org.apache.activemq.filter.ConstantExpression,
+ * but:
+ * 1. For long type constant, the range bound by java Long type;
+ * 2. For float type constant, the range bound by java Double type;
+ * 3. Remove Hex and Octal expression;
+ * 4. Add now expression to support to get current time.
+ * </p>
+ */
+public class ConstantExpression implements Expression {
+
+ static class BooleanConstantExpression extends ConstantExpression implements BooleanExpression {
+ public BooleanConstantExpression(Object value) {
+ super(value);
+ }
+
+ public boolean matches(EvaluationContext context) throws Exception {
+ Object object = evaluate(context);
+ return object != null && object == Boolean.TRUE;
+ }
+ }
+
+ public static final BooleanConstantExpression NULL = new BooleanConstantExpression(null);
+ public static final BooleanConstantExpression TRUE = new BooleanConstantExpression(Boolean.TRUE);
+ public static final BooleanConstantExpression FALSE = new BooleanConstantExpression(Boolean.FALSE);
+
+ private Object value;
+
+ public ConstantExpression(Object value) {
+ this.value = value;
+ }
+
+ public static ConstantExpression createFromDecimal(String text) {
+
+ // Strip off the 'l' or 'L' if needed.
+ if (text.endsWith("l") || text.endsWith("L")) {
+ text = text.substring(0, text.length() - 1);
+ }
+
+ // only support Long.MIN_VALUE ~ Long.MAX_VALUE
+ Number value = new Long(text);
+// try {
+// value = new Long(text);
+// } catch (NumberFormatException e) {
+// // The number may be too big to fit in a long.
+// value = new BigDecimal(text);
+// }
+
+ long l = value.longValue();
+ if (Integer.MIN_VALUE <= l && l <= Integer.MAX_VALUE) {
+ value = Integer.valueOf(value.intValue());
+ }
+ return new ConstantExpression(value);
+ }
+
+ public static ConstantExpression createFloat(String text) {
+ Double value = new Double(text);
+ if (value > Double.MAX_VALUE) {
+ throw new RuntimeException(text + " is greater than " + Double.MAX_VALUE);
+ }
+ if (value < Double.MIN_VALUE) {
+ throw new RuntimeException(text + " is less than " + Double.MIN_VALUE);
+ }
+ return new ConstantExpression(value);
+ }
+
+ public static ConstantExpression createNow() {
+ return new NowExpression();
+ }
+
+ public Object evaluate(EvaluationContext context) throws Exception {
+ return value;
+ }
+
+ public Object getValue() {
+ return value;
+ }
+
+ /**
+ * @see Object#toString()
+ */
+ public String toString() {
+ Object value = getValue();
+ if (value == null) {
+ return "NULL";
+ }
+ if (value instanceof Boolean) {
+ return ((Boolean) value).booleanValue() ? "TRUE" : "FALSE";
+ }
+ if (value instanceof String) {
+ return encodeString((String) value);
+ }
+ return value.toString();
+ }
+
+ /**
+ * @see Object#hashCode()
+ */
+ public int hashCode() {
+ return toString().hashCode();
+ }
+
+ /**
+ * @see Object#equals(Object)
+ */
+ public boolean equals(Object o) {
+
+ if (o == null || !this.getClass().equals(o.getClass())) {
+ return false;
+ }
+ return toString().equals(o.toString());
+
+ }
+
+ /**
+ * Encodes the value of string so that it looks like it would look like when
+ * it was provided in a selector.
+ *
+ * @return
+ */
+ public static String encodeString(String s) {
+ StringBuffer b = new StringBuffer();
+ b.append('\'');
+ for (int i = 0; i < s.length(); i++) {
+ char c = s.charAt(i);
+ if (c == '\'') {
+ b.append(c);
+ }
+ b.append(c);
+ }
+ b.append('\'');
+ return b.toString();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/filter/src/main/java/org/apache/rocketmq/filter/expression/EmptyEvaluationContext.java
----------------------------------------------------------------------
diff --git a/filter/src/main/java/org/apache/rocketmq/filter/expression/EmptyEvaluationContext.java b/filter/src/main/java/org/apache/rocketmq/filter/expression/EmptyEvaluationContext.java
new file mode 100644
index 0000000..52af2d0
--- /dev/null
+++ b/filter/src/main/java/org/apache/rocketmq/filter/expression/EmptyEvaluationContext.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.filter.expression;
+
+import java.util.Map;
+
+/**
+ * Empty context.
+ */
+public class EmptyEvaluationContext implements EvaluationContext {
+ @Override
+ public Object get(String name) {
+ return null;
+ }
+
+ @Override
+ public Map<String, Object> keyValues() {
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/filter/src/main/java/org/apache/rocketmq/filter/expression/EvaluationContext.java
----------------------------------------------------------------------
diff --git a/filter/src/main/java/org/apache/rocketmq/filter/expression/EvaluationContext.java b/filter/src/main/java/org/apache/rocketmq/filter/expression/EvaluationContext.java
new file mode 100644
index 0000000..094ef53
--- /dev/null
+++ b/filter/src/main/java/org/apache/rocketmq/filter/expression/EvaluationContext.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.filter.expression;
+
+import java.util.Map;
+
+/**
+ * Context of evaluate expression.
+ *
+ * Compare to org.apache.activemq.filter.MessageEvaluationContext, this is just an interface.
+ */
+public interface EvaluationContext {
+
+ /**
+ * Get value by name from context
+ *
+ * @param name
+ * @return
+ */
+ Object get(String name);
+
+ /**
+ * Context variables.
+ *
+ * @return
+ */
+ Map<String, Object> keyValues();
+}