You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2017/08/01 07:23:25 UTC
[27/50] [abbrv] incubator-rocketmq git commit: [ROCKETMQ-121]Support
message filtering based on SQL92 closes apache/incubator-rocketmq#82
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/broker/src/main/java/org/apache/rocketmq/broker/processor/ClientManageProcessor.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/ClientManageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/ClientManageProcessor.java
index 6349ffc..67807a8 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/ClientManageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/ClientManageProcessor.java
@@ -22,15 +22,19 @@ import org.apache.rocketmq.broker.client.ClientChannelInfo;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.constant.PermName;
+import org.apache.rocketmq.common.filter.ExpressionType;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.body.CheckClientRequestBody;
import org.apache.rocketmq.common.protocol.header.UnregisterClientRequestHeader;
import org.apache.rocketmq.common.protocol.header.UnregisterClientResponseHeader;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumerData;
import org.apache.rocketmq.common.protocol.heartbeat.HeartbeatData;
import org.apache.rocketmq.common.protocol.heartbeat.ProducerData;
+import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.common.sysflag.TopicSysFlag;
+import org.apache.rocketmq.filter.FilterFactory;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
@@ -54,6 +58,8 @@ public class ClientManageProcessor implements NettyRequestProcessor {
return this.heartBeat(ctx, request);
case RequestCode.UNREGISTER_CLIENT:
return this.unregisterClient(ctx, request);
+ case RequestCode.CHECK_CLIENT_CONFIG:
+ return this.checkClientConfig(ctx, request);
default:
break;
}
@@ -157,4 +163,42 @@ public class ClientManageProcessor implements NettyRequestProcessor {
response.setRemark(null);
return response;
}
+
+ public RemotingCommand checkClientConfig(ChannelHandlerContext ctx, RemotingCommand request)
+ throws RemotingCommandException {
+ final RemotingCommand response = RemotingCommand.createResponseCommand(null);
+
+ CheckClientRequestBody requestBody = CheckClientRequestBody.decode(request.getBody(),
+ CheckClientRequestBody.class);
+
+ if (requestBody != null && requestBody.getSubscriptionData() != null) {
+ SubscriptionData subscriptionData = requestBody.getSubscriptionData();
+
+ if (ExpressionType.isTagType(subscriptionData.getExpressionType())) {
+ response.setCode(ResponseCode.SUCCESS);
+ response.setRemark(null);
+ return response;
+ }
+
+ if (!this.brokerController.getBrokerConfig().isEnablePropertyFilter()) {
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark("The broker does not support consumer to filter message by " + subscriptionData.getExpressionType());
+ return response;
+ }
+
+ try {
+ FilterFactory.INSTANCE.get(subscriptionData.getExpressionType()).compile(subscriptionData.getSubString());
+ } catch (Exception e) {
+ log.warn("Client {}@{} filter message, but failed to compile expression! sub={}, error={}",
+ requestBody.getClientId(), requestBody.getGroup(), requestBody.getSubscriptionData(), e.getMessage());
+ response.setCode(ResponseCode.SUBSCRIPTION_PARSE_FAILED);
+ response.setRemark(e.getMessage());
+ return response;
+ }
+ }
+
+ response.setCode(ResponseCode.SUCCESS);
+ response.setRemark(null);
+ return response;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
index 89967d8..10945da 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
@@ -25,6 +25,10 @@ import java.nio.ByteBuffer;
import java.util.List;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
+import org.apache.rocketmq.broker.filter.ConsumerFilterData;
+import org.apache.rocketmq.broker.filter.ConsumerFilterManager;
+import org.apache.rocketmq.broker.filter.ExpressionForRetryMessageFilter;
+import org.apache.rocketmq.broker.filter.ExpressionMessageFilter;
import org.apache.rocketmq.broker.longpolling.PullRequest;
import org.apache.rocketmq.broker.mqtrace.ConsumeMessageContext;
import org.apache.rocketmq.broker.mqtrace.ConsumeMessageHook;
@@ -34,6 +38,7 @@ import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.TopicFilterType;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.constant.PermName;
+import org.apache.rocketmq.common.filter.ExpressionType;
import org.apache.rocketmq.common.filter.FilterAPI;
import org.apache.rocketmq.common.help.FAQUrl;
import org.apache.rocketmq.common.message.MessageDecoder;
@@ -54,6 +59,7 @@ import org.apache.rocketmq.remoting.netty.RequestTask;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.store.GetMessageResult;
import org.apache.rocketmq.store.MessageExtBrokerInner;
+import org.apache.rocketmq.store.MessageFilter;
import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
@@ -142,13 +148,22 @@ public class PullMessageProcessor implements NettyRequestProcessor {
}
SubscriptionData subscriptionData = null;
+ ConsumerFilterData consumerFilterData = null;
if (hasSubscriptionFlag) {
try {
- subscriptionData = FilterAPI.buildSubscriptionData(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
- requestHeader.getSubscription());
+ subscriptionData = FilterAPI.build(
+ requestHeader.getTopic(), requestHeader.getSubscription(), requestHeader.getExpressionType()
+ );
+ if (!ExpressionType.isTagType(subscriptionData.getExpressionType())) {
+ consumerFilterData = ConsumerFilterManager.build(
+ requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getSubscription(),
+ requestHeader.getExpressionType(), requestHeader.getSubVersion()
+ );
+ assert consumerFilterData != null;
+ }
} catch (Exception e) {
LOG.warn("Parse the consumer's subscription[{}] failed, group: {}", requestHeader.getSubscription(), //
- requestHeader.getConsumerGroup());
+ requestHeader.getConsumerGroup());
response.setCode(ResponseCode.SUBSCRIPTION_PARSE_FAILED);
response.setRemark("parse the consumer's subscription failed");
return response;
@@ -180,16 +195,48 @@ public class PullMessageProcessor implements NettyRequestProcessor {
if (subscriptionData.getSubVersion() < requestHeader.getSubVersion()) {
LOG.warn("The broker's subscription is not latest, group: {} {}", requestHeader.getConsumerGroup(),
- subscriptionData.getSubString());
+ subscriptionData.getSubString());
response.setCode(ResponseCode.SUBSCRIPTION_NOT_LATEST);
response.setRemark("the consumer's subscription not latest");
return response;
}
+ if (!ExpressionType.isTagType(subscriptionData.getExpressionType())) {
+ consumerFilterData = this.brokerController.getConsumerFilterManager().get(requestHeader.getTopic(),
+ requestHeader.getConsumerGroup());
+ if (consumerFilterData == null) {
+ response.setCode(ResponseCode.FILTER_DATA_NOT_EXIST);
+ response.setRemark("The broker's consumer filter data is not exist!Your expression may be wrong!");
+ return response;
+ }
+ if (consumerFilterData.getClientVersion() < requestHeader.getSubVersion()) {
+ LOG.warn("The broker's consumer filter data is not latest, group: {}, topic: {}, serverV: {}, clientV: {}",
+ requestHeader.getConsumerGroup(), requestHeader.getTopic(), consumerFilterData.getClientVersion(), requestHeader.getSubVersion());
+ response.setCode(ResponseCode.FILTER_DATA_NOT_LATEST);
+ response.setRemark("the consumer's consumer filter data not latest");
+ return response;
+ }
+ }
+ }
+
+ if (!ExpressionType.isTagType(subscriptionData.getExpressionType())
+ && !this.brokerController.getBrokerConfig().isEnablePropertyFilter()) {
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark("The broker does not support consumer to filter message by " + subscriptionData.getExpressionType());
+ return response;
+ }
+
+ MessageFilter messageFilter;
+ if (this.brokerController.getBrokerConfig().isFilterSupportRetry()) {
+ messageFilter = new ExpressionForRetryMessageFilter(subscriptionData, consumerFilterData,
+ this.brokerController.getConsumerFilterManager());
+ } else {
+ messageFilter = new ExpressionMessageFilter(subscriptionData, consumerFilterData,
+ this.brokerController.getConsumerFilterManager());
}
final GetMessageResult getMessageResult =
this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
- requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), subscriptionData);
+ requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), messageFilter);
if (getMessageResult != null) {
response.setRemark(getMessageResult.getStatus().name());
responseHeader.setNextBeginOffset(getMessageResult.getNextBeginOffset());
@@ -368,7 +415,7 @@ public class PullMessageProcessor implements NettyRequestProcessor {
long offset = requestHeader.getQueueOffset();
int queueId = requestHeader.getQueueId();
PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills,
- this.brokerController.getMessageStore().now(), offset, subscriptionData);
+ this.brokerController.getMessageStore().now(), offset, subscriptionData, messageFilter);
this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);
response = null;
break;
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/broker/src/test/java/org/apache/rocketmq/broker/filter/CommitLogDispatcherCalcBitMapTest.java
----------------------------------------------------------------------
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/filter/CommitLogDispatcherCalcBitMapTest.java b/broker/src/test/java/org/apache/rocketmq/broker/filter/CommitLogDispatcherCalcBitMapTest.java
new file mode 100644
index 0000000..87f6256
--- /dev/null
+++ b/broker/src/test/java/org/apache/rocketmq/broker/filter/CommitLogDispatcherCalcBitMapTest.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.broker.filter;
+
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.filter.ExpressionType;
+import org.apache.rocketmq.filter.util.BitsArray;
+import org.apache.rocketmq.store.DispatchRequest;
+import org.junit.Test;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class CommitLogDispatcherCalcBitMapTest {
+
+ @Test
+ public void testDispatch_filterDataIllegal() {
+ BrokerConfig brokerConfig = new BrokerConfig();
+ brokerConfig.setEnableCalcFilterBitMap(true);
+
+ ConsumerFilterManager filterManager = new ConsumerFilterManager();
+
+ filterManager.register("topic0", "CID_0", "a is not null and a >= 5",
+ ExpressionType.SQL92, System.currentTimeMillis());
+
+ filterManager.register("topic0", "CID_1", "a is not null and a >= 15",
+ ExpressionType.SQL92, System.currentTimeMillis());
+
+ ConsumerFilterData nullExpression = filterManager.get("topic0", "CID_0");
+ nullExpression.setExpression(null);
+ nullExpression.setCompiledExpression(null);
+ ConsumerFilterData nullBloomData = filterManager.get("topic0", "CID_1");
+ nullBloomData.setBloomFilterData(null);
+
+
+ CommitLogDispatcherCalcBitMap calcBitMap = new CommitLogDispatcherCalcBitMap(brokerConfig,
+ filterManager);
+
+ for (int i = 0; i < 1; i++) {
+ Map<String, String> properties = new HashMap<String, String>(4);
+ properties.put("a", String.valueOf(i * 10 + 5));
+
+ String topic = "topic" + i;
+
+ DispatchRequest dispatchRequest = new DispatchRequest(
+ topic,
+ 0,
+ i * 100 + 123,
+ 100,
+ (long) ("tags" + i).hashCode(),
+ System.currentTimeMillis(),
+ i,
+ null,
+ UUID.randomUUID().toString(),
+ 0,
+ 0,
+ properties
+ );
+
+ calcBitMap.dispatch(dispatchRequest);
+
+ assertThat(dispatchRequest.getBitMap()).isNotNull();
+
+ BitsArray bitsArray = BitsArray.create(dispatchRequest.getBitMap(),
+ filterManager.getBloomFilter().getM());
+
+ for (int j = 0; j < bitsArray.bitLength(); j++) {
+ assertThat(bitsArray.getBit(j)).isFalse();
+ }
+ }
+ }
+
+ @Test
+ public void testDispatch_blankFilterData() {
+ BrokerConfig brokerConfig = new BrokerConfig();
+ brokerConfig.setEnableCalcFilterBitMap(true);
+
+ ConsumerFilterManager filterManager = new ConsumerFilterManager();
+
+ CommitLogDispatcherCalcBitMap calcBitMap = new CommitLogDispatcherCalcBitMap(brokerConfig,
+ filterManager);
+
+ for (int i = 0; i < 10; i++) {
+ Map<String, String> properties = new HashMap<String, String>(4);
+ properties.put("a", String.valueOf(i * 10 + 5));
+
+ String topic = "topic" + i;
+
+ DispatchRequest dispatchRequest = new DispatchRequest(
+ topic,
+ 0,
+ i * 100 + 123,
+ 100,
+ (long) ("tags" + i).hashCode(),
+ System.currentTimeMillis(),
+ i,
+ null,
+ UUID.randomUUID().toString(),
+ 0,
+ 0,
+ properties
+ );
+
+ calcBitMap.dispatch(dispatchRequest);
+
+ assertThat(dispatchRequest.getBitMap()).isNull();
+ }
+ }
+
+ @Test
+ public void testDispatch() {
+ BrokerConfig brokerConfig = new BrokerConfig();
+ brokerConfig.setEnableCalcFilterBitMap(true);
+
+ ConsumerFilterManager filterManager = ConsumerFilterManagerTest.gen(10, 10);
+
+ CommitLogDispatcherCalcBitMap calcBitMap = new CommitLogDispatcherCalcBitMap(brokerConfig,
+ filterManager);
+
+ for (int i = 0; i < 10; i++) {
+ Map<String, String> properties = new HashMap<String, String>(4);
+ properties.put("a", String.valueOf(i * 10 + 5));
+
+ String topic = "topic" + i;
+
+ DispatchRequest dispatchRequest = new DispatchRequest(
+ topic,
+ 0,
+ i * 100 + 123,
+ 100,
+ (long) ("tags" + i).hashCode(),
+ System.currentTimeMillis(),
+ i,
+ null,
+ UUID.randomUUID().toString(),
+ 0,
+ 0,
+ properties
+ );
+
+ calcBitMap.dispatch(dispatchRequest);
+
+ assertThat(dispatchRequest.getBitMap()).isNotNull();
+
+ BitsArray bits = BitsArray.create(dispatchRequest.getBitMap());
+
+ Collection<ConsumerFilterData> filterDatas = filterManager.get(topic);
+
+ for (ConsumerFilterData filterData : filterDatas) {
+
+ if (filterManager.getBloomFilter().isHit(filterData.getBloomFilterData(), bits)) {
+ try {
+ assertThat((Boolean) filterData.getCompiledExpression().evaluate(
+ new MessageEvaluationContext(properties)
+ )).isTrue();
+ } catch (Exception e) {
+ e.printStackTrace();
+ assertThat(true).isFalse();
+ }
+ } else {
+ try {
+ assertThat((Boolean) filterData.getCompiledExpression().evaluate(
+ new MessageEvaluationContext(properties)
+ )).isFalse();
+ } catch (Exception e) {
+ e.printStackTrace();
+ assertThat(true).isFalse();
+ }
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/broker/src/test/java/org/apache/rocketmq/broker/filter/ConsumerFilterManagerTest.java
----------------------------------------------------------------------
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/filter/ConsumerFilterManagerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/filter/ConsumerFilterManagerTest.java
new file mode 100644
index 0000000..c8412a8
--- /dev/null
+++ b/broker/src/test/java/org/apache/rocketmq/broker/filter/ConsumerFilterManagerTest.java
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.broker.filter;
+
+import org.apache.rocketmq.common.filter.ExpressionType;
+import org.apache.rocketmq.common.filter.FilterAPI;
+import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class ConsumerFilterManagerTest {
+
+ public static ConsumerFilterManager gen(int topicCount, int consumerCount) {
+ ConsumerFilterManager filterManager = new ConsumerFilterManager();
+
+ for (int i = 0; i < topicCount; i++) {
+ String topic = "topic" + i;
+
+ for (int j = 0; j < consumerCount; j++) {
+
+ String consumer = "CID_" + j;
+
+ filterManager.register(topic, consumer, expr(j), ExpressionType.SQL92, System.currentTimeMillis());
+ }
+ }
+
+ return filterManager;
+ }
+
+ public static String expr(int i) {
+ return "a is not null and a > " + ((i - 1) * 10) + " and a < " + ((i + 1) * 10);
+ }
+
+ @Test
+ public void testRegister_newExpressionCompileErrorAndRemoveOld() {
+ ConsumerFilterManager filterManager = gen(10, 10);
+
+ assertThat(filterManager.get("topic9", "CID_9")).isNotNull();
+
+ String newExpr = "a between 10,20";
+
+ assertThat(filterManager.register("topic9", "CID_9", newExpr, ExpressionType.SQL92, System.currentTimeMillis() + 1))
+ .isFalse();
+ assertThat(filterManager.get("topic9", "CID_9")).isNull();
+
+ newExpr = "a between 10 AND 20";
+
+ assertThat(filterManager.register("topic9", "CID_9", newExpr, ExpressionType.SQL92, System.currentTimeMillis() + 1))
+ .isTrue();
+
+ ConsumerFilterData filterData = filterManager.get("topic9", "CID_9");
+
+ assertThat(filterData).isNotNull();
+ assertThat(newExpr).isEqualTo(filterData.getExpression());
+ }
+
+ @Test
+ public void testRegister_change() {
+ ConsumerFilterManager filterManager = gen(10, 10);
+
+ ConsumerFilterData filterData = filterManager.get("topic9", "CID_9");
+
+ System.out.println(filterData.getCompiledExpression());
+
+ String newExpr = "a > 0 and a < 10";
+
+ filterManager.register("topic9", "CID_9", newExpr, ExpressionType.SQL92, System.currentTimeMillis() + 1);
+
+ filterData = filterManager.get("topic9", "CID_9");
+
+ assertThat(newExpr).isEqualTo(filterData.getExpression());
+
+ System.out.println(filterData.toString());
+ }
+
+ @Test
+ public void testRegister() {
+ ConsumerFilterManager filterManager = gen(10, 10);
+
+ ConsumerFilterData filterData = filterManager.get("topic9", "CID_9");
+
+ assertThat(filterData).isNotNull();
+ assertThat(filterData.isDead()).isFalse();
+
+ // new version
+ assertThat(filterManager.register(
+ "topic9", "CID_9", "a is not null", ExpressionType.SQL92, System.currentTimeMillis() + 1000
+ )).isTrue();
+
+ ConsumerFilterData newFilter = filterManager.get("topic9", "CID_9");
+
+ assertThat(newFilter).isNotEqualTo(filterData);
+
+ // same version
+ assertThat(filterManager.register(
+ "topic9", "CID_9", "a is null", ExpressionType.SQL92, newFilter.getClientVersion()
+ )).isFalse();
+
+ ConsumerFilterData filterData1 = filterManager.get("topic9", "CID_9");
+
+ assertThat(newFilter).isEqualTo(filterData1);
+ }
+
+ @Test
+ public void testRegister_reAlive() {
+ ConsumerFilterManager filterManager = gen(10, 10);
+
+ ConsumerFilterData filterData = filterManager.get("topic9", "CID_9");
+
+ assertThat(filterData).isNotNull();
+ assertThat(filterData.isDead()).isFalse();
+
+ //make dead
+ filterManager.unRegister("CID_9");
+
+ //reAlive
+ filterManager.register(
+ filterData.getTopic(),
+ filterData.getConsumerGroup(),
+ filterData.getExpression(),
+ filterData.getExpressionType(),
+ System.currentTimeMillis()
+ );
+
+ ConsumerFilterData newFilterData = filterManager.get("topic9", "CID_9");
+
+ assertThat(newFilterData).isNotNull();
+ assertThat(newFilterData.isDead()).isFalse();
+ }
+
+ @Test
+ public void testRegister_bySubscriptionData() {
+ ConsumerFilterManager filterManager = new ConsumerFilterManager();
+ List<SubscriptionData> subscriptionDatas = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ try {
+ subscriptionDatas.add(
+ FilterAPI.build(
+ "topic" + i,
+ "a is not null and a > " + i,
+ ExpressionType.SQL92
+ )
+ );
+ } catch (Exception e) {
+ e.printStackTrace();
+ assertThat(true).isFalse();
+ }
+ }
+
+ filterManager.register("CID_0", subscriptionDatas);
+
+ Collection<ConsumerFilterData> filterDatas = filterManager.getByGroup("CID_0");
+
+ assertThat(filterDatas).isNotNull();
+ assertThat(filterDatas.size()).isEqualTo(10);
+
+ Iterator<ConsumerFilterData> iterator = filterDatas.iterator();
+ while (iterator.hasNext()) {
+ ConsumerFilterData filterData = iterator.next();
+
+ assertThat(filterData).isNotNull();
+ assertThat(filterManager.getBloomFilter().isValid(filterData.getBloomFilterData())).isTrue();
+ }
+ }
+
+ @Test
+ public void testRegister_tag() {
+ ConsumerFilterManager filterManager = new ConsumerFilterManager();
+
+ assertThat(filterManager.register("topic0", "CID_0", "*", null, System.currentTimeMillis())).isFalse();
+
+ Collection<ConsumerFilterData> filterDatas = filterManager.getByGroup("CID_0");
+
+ assertThat(filterDatas).isNullOrEmpty();
+ }
+
+ @Test
+ public void testUnregister() {
+ ConsumerFilterManager filterManager = gen(10, 10);
+
+ ConsumerFilterData filterData = filterManager.get("topic9", "CID_9");
+
+ assertThat(filterData).isNotNull();
+ assertThat(filterData.isDead()).isFalse();
+
+ filterManager.unRegister("CID_9");
+
+ assertThat(filterData.isDead()).isTrue();
+ }
+
+ @Test
+ public void testPersist() {
+ ConsumerFilterManager filterManager = gen(10, 10);
+
+ try {
+ filterManager.persist();
+
+ ConsumerFilterData filterData = filterManager.get("topic9", "CID_9");
+
+ assertThat(filterData).isNotNull();
+ assertThat(filterData.isDead()).isFalse();
+
+ ConsumerFilterManager loadFilter = new ConsumerFilterManager();
+
+ assertThat(loadFilter.load()).isTrue();
+
+ filterData = loadFilter.get("topic9", "CID_9");
+
+ assertThat(filterData).isNotNull();
+ assertThat(filterData.isDead()).isTrue();
+ assertThat(filterData.getCompiledExpression()).isNotNull();
+ } finally {
+ deleteDirectory("./unit_test");
+ }
+ }
+
+ @Test
+ public void testPersist_clean() {
+ ConsumerFilterManager filterManager = gen(10, 10);
+
+ String topic = "topic9";
+ for (int i = 0; i < 10; i++) {
+ String cid = "CID_" + i;
+
+ ConsumerFilterData filterData = filterManager.get(topic, cid);
+
+ assertThat(filterData).isNotNull();
+ assertThat(filterData.isDead()).isFalse();
+
+ //make dead more than 24h
+ filterData.setBornTime(System.currentTimeMillis() - 26 * 60 * 60 * 1000);
+ filterData.setDeadTime(System.currentTimeMillis() - 25 * 60 * 60 * 1000);
+ }
+
+ try {
+ filterManager.persist();
+
+ ConsumerFilterManager loadFilter = new ConsumerFilterManager();
+
+ assertThat(loadFilter.load()).isTrue();
+
+ ConsumerFilterData filterData = loadFilter.get(topic, "CID_9");
+
+ assertThat(filterData).isNull();
+
+ Collection<ConsumerFilterData> topicData = loadFilter.get(topic);
+
+ assertThat(topicData).isNullOrEmpty();
+ } finally {
+ deleteDirectory("./unit_test");
+ }
+ }
+
+ protected void deleteDirectory(String rootPath) {
+ File file = new File(rootPath);
+ deleteFile(file);
+ }
+
+ protected void deleteFile(File file) {
+ File[] subFiles = file.listFiles();
+ if (subFiles != null) {
+ for (File sub : subFiles) {
+ deleteFile(sub);
+ }
+ }
+
+ file.delete();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/broker/src/test/java/org/apache/rocketmq/broker/filter/MessageStoreWithFilterTest.java
----------------------------------------------------------------------
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/filter/MessageStoreWithFilterTest.java b/broker/src/test/java/org/apache/rocketmq/broker/filter/MessageStoreWithFilterTest.java
new file mode 100644
index 0000000..53e563e
--- /dev/null
+++ b/broker/src/test/java/org/apache/rocketmq/broker/filter/MessageStoreWithFilterTest.java
@@ -0,0 +1,392 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.broker.filter;
+
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.filter.ExpressionType;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import org.apache.rocketmq.store.CommitLogDispatcher;
+import org.apache.rocketmq.store.DefaultMessageStore;
+import org.apache.rocketmq.store.DispatchRequest;
+import org.apache.rocketmq.store.GetMessageResult;
+import org.apache.rocketmq.store.GetMessageStatus;
+import org.apache.rocketmq.store.MessageArrivingListener;
+import org.apache.rocketmq.store.MessageExtBrokerInner;
+import org.apache.rocketmq.store.PutMessageResult;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.apache.rocketmq.store.stats.BrokerStatsManager;
+import org.junit.Test;
+
+import java.io.File;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class MessageStoreWithFilterTest {
+
+ private static final String msg = "Once, there was a chance for me!";
+ private static final byte[] msgBody = msg.getBytes();
+
+ private static final String topic = "topic";
+ private static final int queueId = 0;
+ private static final String storePath = "." + File.separator + "unit_test_store";
+ private static final int commitLogFileSize = 1024 * 1024 * 256;
+ private static final int cqFileSize = 300000 * 20;
+ private static final int cqExtFileSize = 300000 * 128;
+
+ private static SocketAddress BornHost;
+
+ private static SocketAddress StoreHost;
+
+ static {
+ try {
+ StoreHost = new InetSocketAddress(InetAddress.getLocalHost(), 8123);
+ } catch (UnknownHostException e) {
+ e.printStackTrace();
+ }
+ try {
+ BornHost = new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 0);
+ } catch (UnknownHostException e) {
+ e.printStackTrace();
+ }
+ }
+
+ public MessageExtBrokerInner buildMessage() {
+ MessageExtBrokerInner msg = new MessageExtBrokerInner();
+ msg.setTopic(topic);
+ msg.setTags("TAG1");
+ msg.setKeys("Hello");
+ msg.setBody(msgBody);
+ msg.setKeys(String.valueOf(System.currentTimeMillis()));
+ msg.setQueueId(queueId);
+ msg.setSysFlag(0);
+ msg.setBornTimestamp(System.currentTimeMillis());
+ msg.setStoreHost(StoreHost);
+ msg.setBornHost(BornHost);
+ for (int i = 1; i < 3; i++) {
+ msg.putUserProperty(String.valueOf(i), "imagoodperson" + i);
+ }
+ msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
+
+ return msg;
+ }
+
+ public MessageStoreConfig buildStoreConfig(int commitLogFileSize, int cqFileSize,
+ boolean enableCqExt, int cqExtFileSize) {
+ MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
+ messageStoreConfig.setMapedFileSizeCommitLog(commitLogFileSize);
+ messageStoreConfig.setMapedFileSizeConsumeQueue(cqFileSize);
+ messageStoreConfig.setMappedFileSizeConsumeQueueExt(cqExtFileSize);
+ messageStoreConfig.setMessageIndexEnable(false);
+ messageStoreConfig.setEnableConsumeQueueExt(enableCqExt);
+
+ messageStoreConfig.setStorePathRootDir(storePath);
+ messageStoreConfig.setStorePathCommitLog(storePath + File.separator + "commitlog");
+
+ return messageStoreConfig;
+ }
+
+ protected DefaultMessageStore gen(ConsumerFilterManager filterManager) throws Exception {
+ MessageStoreConfig messageStoreConfig = buildStoreConfig(
+ commitLogFileSize, cqFileSize, true, cqExtFileSize
+ );
+
+ BrokerConfig brokerConfig = new BrokerConfig();
+ brokerConfig.setEnableCalcFilterBitMap(true);
+ brokerConfig.setMaxErrorRateOfBloomFilter(20);
+ brokerConfig.setExpectConsumerNumUseFilter(64);
+
+ DefaultMessageStore master = new DefaultMessageStore(
+ messageStoreConfig,
+ new BrokerStatsManager(brokerConfig.getBrokerClusterName()),
+ new MessageArrivingListener() {
+ @Override
+ public void arriving(String topic, int queueId, long logicOffset, long tagsCode,
+ long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {
+// System.out.println(String.format("Msg coming: %s, %d, %d, %d",
+// topic, queueId, logicOffset, tagsCode));
+ }
+ }
+ , brokerConfig);
+
+ master.getDispatcherList().addFirst(new CommitLogDispatcher() {
+ @Override
+ public void dispatch(DispatchRequest request) {
+ try {
+// System.out.println(String.format("offset:%d, bitMap:%s", request.getCommitLogOffset(),
+// BitsArray.create(request.getBitMap()).toString()));
+ } catch (Throwable e) {
+ e.printStackTrace();
+ }
+ }
+ });
+ master.getDispatcherList().addFirst(new CommitLogDispatcherCalcBitMap(brokerConfig, filterManager));
+
+ assertThat(master.load()).isTrue();
+
+ master.start();
+
+ return master;
+ }
+
+ protected List<MessageExtBrokerInner> putMsg(DefaultMessageStore master, int topicCount, int msgCountPerTopic) throws Exception {
+ List<MessageExtBrokerInner> msgs = new ArrayList<MessageExtBrokerInner>();
+ for (int i = 0; i < topicCount; i++) {
+ String realTopic = topic + i;
+ for (int j = 0; j < msgCountPerTopic; j++) {
+ MessageExtBrokerInner msg = buildMessage();
+ msg.setTopic(realTopic);
+ msg.putUserProperty("a", String.valueOf(j * 10 + 5));
+ msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
+
+ PutMessageResult result = master.putMessage(msg);
+
+ msg.setMsgId(result.getAppendMessageResult().getMsgId());
+
+ msgs.add(msg);
+ }
+ }
+
+ return msgs;
+ }
+
+ protected void deleteDirectory(String rootPath) {
+ File file = new File(rootPath);
+ deleteFile(file);
+ }
+
+ protected void deleteFile(File file) {
+ File[] subFiles = file.listFiles();
+ if (subFiles != null) {
+ for (File sub : subFiles) {
+ deleteFile(sub);
+ }
+ }
+
+ file.delete();
+ }
+
+ protected List<MessageExtBrokerInner> filtered(List<MessageExtBrokerInner> msgs, ConsumerFilterData filterData) {
+ List<MessageExtBrokerInner> filteredMsgs = new ArrayList<MessageExtBrokerInner>();
+
+ for (MessageExtBrokerInner messageExtBrokerInner : msgs) {
+
+ if (!messageExtBrokerInner.getTopic().equals(filterData.getTopic())) {
+ continue;
+ }
+
+ try {
+ Object evlRet = filterData.getCompiledExpression().evaluate(new MessageEvaluationContext(messageExtBrokerInner.getProperties()));
+
+ if (evlRet == null || !(evlRet instanceof Boolean) || (Boolean) evlRet) {
+ filteredMsgs.add(messageExtBrokerInner);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ assertThat(true).isFalse();
+ }
+ }
+
+ return filteredMsgs;
+ }
+
+ @Test
+ public void testGetMessage_withFilterBitMapAndConsumerChanged() {
+ int topicCount = 10, msgPerTopic = 10;
+ ConsumerFilterManager filterManager = ConsumerFilterManagerTest.gen(topicCount, msgPerTopic);
+
+ DefaultMessageStore master = null;
+ try {
+ master = gen(filterManager);
+ } catch (Exception e) {
+ e.printStackTrace();
+ assertThat(true).isFalse();
+ }
+
+ try {
+ List<MessageExtBrokerInner> msgs = null;
+ try {
+ msgs = putMsg(master, topicCount, msgPerTopic);
+ } catch (Exception e) {
+ e.printStackTrace();
+ assertThat(true).isFalse();
+ }
+
+ // sleep to wait for consume queue has been constructed.
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ assertThat(true).isFalse();
+ }
+
+ // reset consumer;
+ String topic = "topic" + 0;
+ String resetGroup = "CID_" + 2;
+ String normalGroup = "CID_" + 3;
+
+ {
+ // reset CID_2@topic0 to get all messages.
+ SubscriptionData resetSubData = new SubscriptionData();
+ resetSubData.setExpressionType(ExpressionType.SQL92);
+ resetSubData.setTopic(topic);
+ resetSubData.setClassFilterMode(false);
+ resetSubData.setSubString("a is not null OR a is null");
+
+ ConsumerFilterData resetFilterData = ConsumerFilterManager.build(topic,
+ resetGroup, resetSubData.getSubString(), resetSubData.getExpressionType(),
+ System.currentTimeMillis());
+
+ GetMessageResult resetGetResult = master.getMessage(resetGroup, topic, queueId, 0, 1000,
+ new ExpressionMessageFilter(resetSubData, resetFilterData, filterManager));
+
+ try {
+ assertThat(resetGetResult).isNotNull();
+
+ List<MessageExtBrokerInner> filteredMsgs = filtered(msgs, resetFilterData);
+
+ assertThat(resetGetResult.getMessageBufferList().size()).isEqualTo(filteredMsgs.size());
+ } finally {
+ resetGetResult.release();
+ }
+ }
+
+ {
+ ConsumerFilterData normalFilterData = filterManager.get(topic, normalGroup);
+ assertThat(normalFilterData).isNotNull();
+ assertThat(normalFilterData.getBornTime()).isLessThan(System.currentTimeMillis());
+
+ SubscriptionData normalSubData = new SubscriptionData();
+ normalSubData.setExpressionType(normalFilterData.getExpressionType());
+ normalSubData.setTopic(topic);
+ normalSubData.setClassFilterMode(false);
+ normalSubData.setSubString(normalFilterData.getExpression());
+
+ List<MessageExtBrokerInner> filteredMsgs = filtered(msgs, normalFilterData);
+
+ GetMessageResult normalGetResult = master.getMessage(normalGroup, topic, queueId, 0, 1000,
+ new ExpressionMessageFilter(normalSubData, normalFilterData, filterManager));
+
+ try {
+ assertThat(normalGetResult).isNotNull();
+ assertThat(normalGetResult.getMessageBufferList().size()).isEqualTo(filteredMsgs.size());
+ } finally {
+ normalGetResult.release();
+ }
+ }
+ } finally {
+ master.shutdown();
+ master.destroy();
+ deleteDirectory(storePath);
+ }
+ }
+
+ @Test
+ public void testGetMessage_withFilterBitMap() {
+ int topicCount = 10, msgPerTopic = 500;
+ ConsumerFilterManager filterManager = ConsumerFilterManagerTest.gen(topicCount, msgPerTopic);
+
+ DefaultMessageStore master = null;
+ try {
+ master = gen(filterManager);
+ } catch (Exception e) {
+ e.printStackTrace();
+ assertThat(true).isFalse();
+ }
+
+ try {
+ List<MessageExtBrokerInner> msgs = null;
+ try {
+ msgs = putMsg(master, topicCount, msgPerTopic);
+ // sleep to wait for consume queue has been constructed.
+ Thread.sleep(1000);
+ } catch (Exception e) {
+ e.printStackTrace();
+ assertThat(true).isFalse();
+ }
+
+ for (int i = 0; i < topicCount; i++) {
+ String realTopic = topic + i;
+
+ for (int j = 0; j < msgPerTopic; j++) {
+ String group = "CID_" + j;
+
+ ConsumerFilterData filterData = filterManager.get(realTopic, group);
+ assertThat(filterData).isNotNull();
+
+ List<MessageExtBrokerInner> filteredMsgs = filtered(msgs, filterData);
+
+ SubscriptionData subscriptionData = new SubscriptionData();
+ subscriptionData.setExpressionType(filterData.getExpressionType());
+ subscriptionData.setTopic(filterData.getTopic());
+ subscriptionData.setClassFilterMode(false);
+ subscriptionData.setSubString(filterData.getExpression());
+
+ GetMessageResult getMessageResult = master.getMessage(group, realTopic, queueId, 0, 10000,
+ new ExpressionMessageFilter(subscriptionData, filterData, filterManager));
+ String assertMsg = group + "-" + realTopic;
+ try {
+ assertThat(getMessageResult).isNotNull();
+ assertThat(GetMessageStatus.FOUND).isEqualTo(getMessageResult.getStatus());
+ assertThat(getMessageResult.getMessageBufferList()).isNotNull().isNotEmpty();
+ assertThat(getMessageResult.getMessageBufferList().size()).isEqualTo(filteredMsgs.size());
+
+ for (ByteBuffer buffer : getMessageResult.getMessageBufferList()) {
+ MessageExt messageExt = MessageDecoder.decode(buffer.slice(), false);
+ assertThat(messageExt).isNotNull();
+
+ Object evlRet = null;
+ try {
+ evlRet = filterData.getCompiledExpression().evaluate(new MessageEvaluationContext(messageExt.getProperties()));
+ } catch (Exception e) {
+ e.printStackTrace();
+ assertThat(true).isFalse();
+ }
+
+ assertThat(evlRet).isNotNull().isEqualTo(Boolean.TRUE);
+
+ // check
+ boolean find = false;
+ for (MessageExtBrokerInner messageExtBrokerInner : filteredMsgs) {
+ if (messageExtBrokerInner.getMsgId().equals(messageExt.getMsgId())) {
+ find = true;
+ }
+ }
+ assertThat(find).isTrue();
+ }
+ } finally {
+ getMessageResult.release();
+ }
+ }
+ }
+ } finally {
+ master.shutdown();
+ master.destroy();
+ deleteDirectory(storePath);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/broker/src/test/java/org/apache/rocketmq/broker/processor/PullMessageProcessorTest.java
----------------------------------------------------------------------
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/PullMessageProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/PullMessageProcessorTest.java
index d3d9812..941d4a7 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/processor/PullMessageProcessorTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/PullMessageProcessorTest.java
@@ -25,6 +25,7 @@ import java.util.List;
import java.util.Set;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.client.ClientChannelInfo;
+import org.apache.rocketmq.broker.filter.ExpressionMessageFilter;
import org.apache.rocketmq.broker.mqtrace.ConsumeMessageContext;
import org.apache.rocketmq.broker.mqtrace.ConsumeMessageHook;
import org.apache.rocketmq.common.BrokerConfig;
@@ -126,7 +127,7 @@ public class PullMessageProcessorTest {
@Test
public void testProcessRequest_Found() throws RemotingCommandException {
GetMessageResult getMessageResult = createGetMessageResult();
- when(messageStore.getMessage(anyString(), anyString(), anyInt(), anyLong(), anyInt(), any(SubscriptionData.class))).thenReturn(getMessageResult);
+ when(messageStore.getMessage(anyString(), anyString(), anyInt(), anyLong(), anyInt(), any(ExpressionMessageFilter.class))).thenReturn(getMessageResult);
final RemotingCommand request = createPullMsgCommand(RequestCode.PULL_MESSAGE);
RemotingCommand response = pullMessageProcessor.processRequest(handlerContext, request);
@@ -137,7 +138,7 @@ public class PullMessageProcessorTest {
@Test
public void testProcessRequest_FoundWithHook() throws RemotingCommandException {
GetMessageResult getMessageResult = createGetMessageResult();
- when(messageStore.getMessage(anyString(), anyString(), anyInt(), anyLong(), anyInt(), any(SubscriptionData.class))).thenReturn(getMessageResult);
+ when(messageStore.getMessage(anyString(), anyString(), anyInt(), anyLong(), anyInt(), any(ExpressionMessageFilter.class))).thenReturn(getMessageResult);
List<ConsumeMessageHook> consumeMessageHookList = new ArrayList<>();
final ConsumeMessageContext[] messageContext = new ConsumeMessageContext[1];
ConsumeMessageHook consumeMessageHook = new ConsumeMessageHook() {
@@ -168,7 +169,7 @@ public class PullMessageProcessorTest {
public void testProcessRequest_MsgWasRemoving() throws RemotingCommandException {
GetMessageResult getMessageResult = createGetMessageResult();
getMessageResult.setStatus(GetMessageStatus.MESSAGE_WAS_REMOVING);
- when(messageStore.getMessage(anyString(), anyString(), anyInt(), anyLong(), anyInt(), any(SubscriptionData.class))).thenReturn(getMessageResult);
+ when(messageStore.getMessage(anyString(), anyString(), anyInt(), anyLong(), anyInt(), any(ExpressionMessageFilter.class))).thenReturn(getMessageResult);
final RemotingCommand request = createPullMsgCommand(RequestCode.PULL_MESSAGE);
RemotingCommand response = pullMessageProcessor.processRequest(handlerContext, request);
@@ -180,7 +181,7 @@ public class PullMessageProcessorTest {
public void testProcessRequest_NoMsgInQueue() throws RemotingCommandException {
GetMessageResult getMessageResult = createGetMessageResult();
getMessageResult.setStatus(GetMessageStatus.NO_MESSAGE_IN_QUEUE);
- when(messageStore.getMessage(anyString(), anyString(), anyInt(), anyLong(), anyInt(), any(SubscriptionData.class))).thenReturn(getMessageResult);
+ when(messageStore.getMessage(anyString(), anyString(), anyInt(), anyLong(), anyInt(), any(ExpressionMessageFilter.class))).thenReturn(getMessageResult);
final RemotingCommand request = createPullMsgCommand(RequestCode.PULL_MESSAGE);
RemotingCommand response = pullMessageProcessor.processRequest(handlerContext, request);
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
index 3903fe2..9c9b59e 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
@@ -519,6 +519,21 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
}
/**
+ * Subscribe a topic by message selector.
+ *
+ * @see org.apache.rocketmq.client.consumer.MessageSelector#bySql
+ * @see org.apache.rocketmq.client.consumer.MessageSelector#byTag
+ *
+ * @param topic topic to consume.
+ * @param messageSelector {@link org.apache.rocketmq.client.consumer.MessageSelector}
+ * @throws MQClientException
+ */
+ @Override
+ public void subscribe(final String topic, final MessageSelector messageSelector) throws MQClientException {
+ this.defaultMQPushConsumerImpl.subscribe(topic, messageSelector);
+ }
+
+ /**
* Un-subscribe the specified topic from subscription.
* @param topic message topic
*/
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/client/src/main/java/org/apache/rocketmq/client/consumer/MQPushConsumer.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/MQPushConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/MQPushConsumer.java
index 9255281..9c6c1f1 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/MQPushConsumer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/MQPushConsumer.java
@@ -70,6 +70,27 @@ public interface MQPushConsumer extends MQConsumer {
void subscribe(final String topic, final String fullClassName, final String filterClassSource) throws MQClientException;
/**
+ * Subscribe some topic with selector.
+ * <p>
+ * This interface also has the ability of {@link #subscribe(String, String)},
+ * and, support other message selection, such as {@link org.apache.rocketmq.common.filter.ExpressionType#SQL92}.
+ * </p>
+ * <p/>
+ * <p>
+ * Choose Tag: {@link MessageSelector#byTag(java.lang.String)}
+ * </p>
+ * <p/>
+ * <p>
+ * Choose SQL92: {@link MessageSelector#bySql(java.lang.String)}
+ * </p>
+ *
+ * @param topic
+ * @param selector message selector({@link MessageSelector}), can be null.
+ * @throws MQClientException
+ */
+ void subscribe(final String topic, final MessageSelector selector) throws MQClientException;
+
+ /**
* Unsubscribe consumption some topic
*
* @param topic message topic
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/client/src/main/java/org/apache/rocketmq/client/consumer/MessageSelector.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/MessageSelector.java b/client/src/main/java/org/apache/rocketmq/client/consumer/MessageSelector.java
new file mode 100644
index 0000000..35a5181
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/MessageSelector.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.consumer;
+
+import org.apache.rocketmq.common.filter.ExpressionType;
+
+/**
+ *
+ * Message selector: select message at server.
+ * <p>
+ * Now, support:
+ * <li>Tag: {@link org.apache.rocketmq.common.filter.ExpressionType#TAG}
+ * </li>
+ * <li>SQL92: {@link org.apache.rocketmq.common.filter.ExpressionType#SQL92}
+ * </li>
+ * </p>
+ */
+public class MessageSelector {
+
+ /**
+ * @see org.apache.rocketmq.common.filter.ExpressionType
+ */
+ private String type;
+
+ /**
+ * expression content.
+ */
+ private String expression;
+
+ private MessageSelector(String type, String expression) {
+ this.type = type;
+ this.expression = expression;
+ }
+
+ /**
+ * Use SLQ92 to select message.
+ *
+ * @param sql if null or empty, will be treated as select all message.
+ * @return
+ */
+ public static MessageSelector bySql(String sql) {
+ return new MessageSelector(ExpressionType.SQL92, sql);
+ }
+
+ /**
+ * Use tag to select message.
+ *
+ * @param tag if null or empty or "*", will be treated as select all message.
+ * @return
+ */
+ public static MessageSelector byTag(String tag) {
+ return new MessageSelector(ExpressionType.TAG, tag);
+ }
+
+ public String getExpressionType() {
+ return type;
+ }
+
+ public String getExpression() {
+ return expression;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/client/src/main/java/org/apache/rocketmq/client/impl/FindBrokerResult.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/FindBrokerResult.java b/client/src/main/java/org/apache/rocketmq/client/impl/FindBrokerResult.java
index 295060e..4367a4c 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/FindBrokerResult.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/FindBrokerResult.java
@@ -19,10 +19,18 @@ package org.apache.rocketmq.client.impl;
public class FindBrokerResult {
private final String brokerAddr;
private final boolean slave;
+ private final int brokerVersion;
public FindBrokerResult(String brokerAddr, boolean slave) {
this.brokerAddr = brokerAddr;
this.slave = slave;
+ this.brokerVersion = 0;
+ }
+
+ public FindBrokerResult(String brokerAddr, boolean slave, int brokerVersion) {
+ this.brokerAddr = brokerAddr;
+ this.slave = slave;
+ this.brokerVersion = brokerVersion;
}
public String getBrokerAddr() {
@@ -32,4 +40,8 @@ public class FindBrokerResult {
public boolean isSlave() {
return slave;
}
+
+ public int getBrokerVersion() {
+ return brokerVersion;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index ff25334..4244bdd 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -59,6 +59,7 @@ import org.apache.rocketmq.common.namesrv.TopAddressing;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.body.BrokerStatsData;
+import org.apache.rocketmq.common.protocol.body.CheckClientRequestBody;
import org.apache.rocketmq.common.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
import org.apache.rocketmq.common.protocol.body.ConsumeStatsList;
@@ -70,6 +71,7 @@ import org.apache.rocketmq.common.protocol.body.KVTable;
import org.apache.rocketmq.common.protocol.body.LockBatchRequestBody;
import org.apache.rocketmq.common.protocol.body.LockBatchResponseBody;
import org.apache.rocketmq.common.protocol.body.ProducerConnection;
+import org.apache.rocketmq.common.protocol.body.QueryConsumeQueueResponseBody;
import org.apache.rocketmq.common.protocol.body.QueryConsumeTimeSpanBody;
import org.apache.rocketmq.common.protocol.body.QueryCorrectionOffsetBody;
import org.apache.rocketmq.common.protocol.body.QueueTimeSpan;
@@ -103,6 +105,7 @@ import org.apache.rocketmq.common.protocol.header.GetTopicStatsInfoRequestHeader
import org.apache.rocketmq.common.protocol.header.GetTopicsByClusterRequestHeader;
import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
import org.apache.rocketmq.common.protocol.header.PullMessageResponseHeader;
+import org.apache.rocketmq.common.protocol.header.QueryConsumeQueueRequestHeader;
import org.apache.rocketmq.common.protocol.header.QueryConsumeTimeSpanRequestHeader;
import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetRequestHeader;
import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetResponseHeader;
@@ -129,6 +132,7 @@ import org.apache.rocketmq.common.protocol.header.namesrv.PutKVConfigRequestHead
import org.apache.rocketmq.common.protocol.header.namesrv.WipeWritePermOfBrokerRequestHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.WipeWritePermOfBrokerResponseHeader;
import org.apache.rocketmq.common.protocol.heartbeat.HeartbeatData;
+import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.remoting.InvokeCallback;
@@ -168,7 +172,7 @@ public class MQClientAPIImpl {
public MQClientAPIImpl(final NettyClientConfig nettyClientConfig, final ClientRemotingProcessor clientRemotingProcessor,
RPCHook rpcHook, final ClientConfig clientConfig) {
this.clientConfig = clientConfig;
- topAddressing = new TopAddressing(MixAll.WS_ADDR, clientConfig.getUnitName());
+ topAddressing = new TopAddressing(MixAll.getWSAddr(), clientConfig.getUnitName());
this.remotingClient = new NettyRemotingClient(nettyClientConfig, null);
this.clientRemotingProcessor = clientRemotingProcessor;
@@ -843,7 +847,7 @@ public class MQClientAPIImpl {
this.remotingClient.invokeOneway(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis);
}
- public void sendHearbeat(//
+ public int sendHearbeat(//
final String addr, //
final HeartbeatData heartbeatData, //
final long timeoutMillis//
@@ -855,7 +859,7 @@ public class MQClientAPIImpl {
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
- return;
+ return response.getVersion();
}
default:
break;
@@ -2024,4 +2028,51 @@ public class MQClientAPIImpl {
return configMap;
}
+ public QueryConsumeQueueResponseBody queryConsumeQueue(final String brokerAddr, final String topic, final int queueId,
+ final long index, final int count, final String consumerGroup,
+ final long timeoutMillis) throws InterruptedException,
+ RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQClientException {
+
+ QueryConsumeQueueRequestHeader requestHeader = new QueryConsumeQueueRequestHeader();
+ requestHeader.setTopic(topic);
+ requestHeader.setQueueId(queueId);
+ requestHeader.setIndex(index);
+ requestHeader.setCount(count);
+ requestHeader.setConsumerGroup(consumerGroup);
+
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.QUERY_CONSUME_QUEUE, requestHeader);
+
+ RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), brokerAddr), request, timeoutMillis);
+
+ assert response != null;
+
+ if (ResponseCode.SUCCESS == response.getCode()) {
+ return QueryConsumeQueueResponseBody.decode(response.getBody(), QueryConsumeQueueResponseBody.class);
+ }
+
+ throw new MQClientException(response.getCode(), response.getRemark());
+ }
+
+ public void checkClientInBroker(final String brokerAddr, final String consumerGroup,
+ final String clientId, final SubscriptionData subscriptionData,
+ final long timeoutMillis)
+ throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException,
+ RemotingConnectException, MQClientException {
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CHECK_CLIENT_CONFIG, null);
+
+ CheckClientRequestBody requestBody = new CheckClientRequestBody();
+ requestBody.setClientId(clientId);
+ requestBody.setGroup(consumerGroup);
+ requestBody.setSubscriptionData(subscriptionData);
+
+ request.setBody(requestBody.encode());
+
+ RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), brokerAddr), request, timeoutMillis);
+
+ assert response != null;
+
+ if (ResponseCode.SUCCESS != response.getCode()) {
+ throw new MQClientException(response.getCode(), response.getRemark());
+ }
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
index 67f3ebe..2cafe29 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
@@ -30,6 +30,7 @@ import java.util.concurrent.ConcurrentHashMap;
import org.apache.rocketmq.client.QueryResult;
import org.apache.rocketmq.client.Validators;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.PullCallback;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.listener.MessageListener;
@@ -405,15 +406,16 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
this.pullAPIWrapper.pullKernelImpl(//
pullRequest.getMessageQueue(), // 1
subExpression, // 2
- subscriptionData.getSubVersion(), // 3
- pullRequest.getNextOffset(), // 4
- this.defaultMQPushConsumer.getPullBatchSize(), // 5
- sysFlag, // 6
- commitOffsetValue, // 7
- BROKER_SUSPEND_MAX_TIME_MILLIS, // 8
- CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND, // 9
- CommunicationMode.ASYNC, // 10
- pullCallback// 11
+ subscriptionData.getExpressionType(), // 3
+ subscriptionData.getSubVersion(), // 4
+ pullRequest.getNextOffset(), // 5
+ this.defaultMQPushConsumer.getPullBatchSize(), // 6
+ sysFlag, // 7
+ commitOffsetValue, // 8
+ BROKER_SUSPEND_MAX_TIME_MILLIS, // 9
+ CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND, // 10
+ CommunicationMode.ASYNC, // 11
+ pullCallback // 12
);
} catch (Exception e) {
log.error("pullKernelImpl exception", e);
@@ -615,6 +617,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
}
this.updateTopicSubscribeInfoWhenSubscriptionChanged();
+ this.mQClientFactory.checkClientInBroker();
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
this.mQClientFactory.rebalanceImmediately();
}
@@ -836,6 +839,25 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
}
}
+ public void subscribe(final String topic, final MessageSelector messageSelector) throws MQClientException {
+ try {
+ if (messageSelector == null) {
+ subscribe(topic, SubscriptionData.SUB_ALL);
+ return;
+ }
+
+ SubscriptionData subscriptionData = FilterAPI.build(topic,
+ messageSelector.getExpression(), messageSelector.getExpressionType());
+
+ this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
+ if (this.mQClientFactory != null) {
+ this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
+ }
+ } catch (Exception e) {
+ throw new MQClientException("subscription exception", e);
+ }
+ }
+
public void suspend() {
this.pause = true;
log.info("suspend this consumer, {}", this.defaultMQPushConsumer.getConsumerGroup());
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java
index 96e21e1..304a44a 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java
@@ -33,7 +33,9 @@ import org.apache.rocketmq.client.impl.CommunicationMode;
import org.apache.rocketmq.client.impl.FindBrokerResult;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.client.log.ClientLogger;
+import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.filter.ExpressionType;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageDecoder;
@@ -135,6 +137,7 @@ public class PullAPIWrapper {
public PullResult pullKernelImpl(
final MessageQueue mq,
final String subExpression,
+ final String expressionType,
final long subVersion,
final long offset,
final int maxNums,
@@ -156,6 +159,14 @@ public class PullAPIWrapper {
}
if (findBrokerResult != null) {
+ {
+ // check version
+ if (!ExpressionType.isTagType(expressionType)
+ && findBrokerResult.getBrokerVersion() < MQVersion.Version.V4_1_0_SNAPSHOT.ordinal()) {
+ throw new MQClientException("The broker[" + mq.getBrokerName() + ", "
+ + findBrokerResult.getBrokerVersion() + "] does not upgrade to support for filter message by " + expressionType, null);
+ }
+ }
int sysFlagInner = sysFlag;
if (findBrokerResult.isSlave()) {
@@ -173,6 +184,7 @@ public class PullAPIWrapper {
requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis);
requestHeader.setSubscription(subExpression);
requestHeader.setSubVersion(subVersion);
+ requestHeader.setExpressionType(expressionType);
String brokerAddr = findBrokerResult.getBrokerAddr();
if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) {
@@ -192,6 +204,34 @@ public class PullAPIWrapper {
throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
}
+ public PullResult pullKernelImpl(
+ final MessageQueue mq,
+ final String subExpression,
+ final long subVersion,
+ final long offset,
+ final int maxNums,
+ final int sysFlag,
+ final long commitOffset,
+ final long brokerSuspendMaxTimeMillis,
+ final long timeoutMillis,
+ final CommunicationMode communicationMode,
+ final PullCallback pullCallback
+ ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+ return pullKernelImpl(
+ mq,
+ subExpression,
+ ExpressionType.TAG,
+ subVersion, offset,
+ maxNums,
+ sysFlag,
+ commitOffset,
+ brokerSuspendMaxTimeMillis,
+ timeoutMillis,
+ communicationMode,
+ pullCallback
+ );
+ }
+
public long recalculatePullFromWhichNode(final MessageQueue mq) {
if (this.isConnectBrokerByUser()) {
return this.defaultBrokerId;
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
index d7e02fe..a8c65b2 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
@@ -61,6 +61,7 @@ import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.ServiceState;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.PermName;
+import org.apache.rocketmq.common.filter.ExpressionType;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
@@ -98,6 +99,8 @@ public class MQClientInstance {
private final Lock lockHeartbeat = new ReentrantLock();
private final ConcurrentHashMap<String/* Broker Name */, HashMap<Long/* brokerId */, String/* address */>> brokerAddrTable =
new ConcurrentHashMap<String, HashMap<Long, String>>();
+ private final ConcurrentHashMap<String/* Broker Name */, HashMap<String/* address */, Integer>> brokerVersionTable =
+ new ConcurrentHashMap<String, HashMap<String, Integer>>();
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
@@ -404,6 +407,44 @@ public class MQClientInstance {
}
}
+ public void checkClientInBroker() throws MQClientException {
+ Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
+
+ while (it.hasNext()) {
+ Entry<String, MQConsumerInner> entry = it.next();
+ Set<SubscriptionData> subscriptionInner = entry.getValue().subscriptions();
+ if (subscriptionInner == null || subscriptionInner.isEmpty()) {
+ return;
+ }
+
+ for (SubscriptionData subscriptionData : subscriptionInner) {
+ if (ExpressionType.isTagType(subscriptionData.getExpressionType())) {
+ continue;
+ }
+ // may need to check one broker every cluster...
+ // assume that the configs of every broker in cluster are the the same.
+ String addr = findBrokerAddrByTopic(subscriptionData.getTopic());
+
+ if (addr != null) {
+ try {
+ this.getMQClientAPIImpl().checkClientInBroker(
+ addr, entry.getKey(), this.clientId, subscriptionData, 3 * 1000
+ );
+ } catch (Exception e) {
+ if (e instanceof MQClientException) {
+ throw (MQClientException) e;
+ } else {
+ throw new MQClientException("Check client in broker error, maybe because you use "
+ + subscriptionData.getExpressionType() + " to filter message, but server has not been upgraded to support!"
+ + "This error would not affect the launch of consumer, but may has impact on message receiving if you " +
+ "have use the new features which are not supported by server, please check the log!", e);
+ }
+ }
+ }
+ }
+ }
+ }
+
public void sendHeartbeatToAllBrokerWithLock() {
if (this.lockHeartbeat.tryLock()) {
try {
@@ -493,7 +534,11 @@ public class MQClientInstance {
}
try {
- this.mQClientAPIImpl.sendHearbeat(addr, heartbeatData, 3000);
+ int version = this.mQClientAPIImpl.sendHearbeat(addr, heartbeatData, 3000);
+ if (!this.brokerVersionTable.containsKey(brokerName)) {
+ this.brokerVersionTable.put(brokerName, new HashMap<String, Integer>(4));
+ }
+ this.brokerVersionTable.get(brokerName).put(addr, version);
if (times % 20 == 0) {
log.info("send heart beat to broker[{} {} {}] success", brokerName, id, addr);
log.info(heartbeatData.toString());
@@ -943,7 +988,7 @@ public class MQClientInstance {
}
if (found) {
- return new FindBrokerResult(brokerAddr, slave);
+ return new FindBrokerResult(brokerAddr, slave, findBrokerVersion(brokerName, brokerAddr));
}
return null;
@@ -982,12 +1027,21 @@ public class MQClientInstance {
}
if (found) {
- return new FindBrokerResult(brokerAddr, slave);
+ return new FindBrokerResult(brokerAddr, slave, findBrokerVersion(brokerName, brokerAddr));
}
return null;
}
+ public int findBrokerVersion(String brokerName, String brokerAddr) {
+ if (this.brokerVersionTable.containsKey(brokerName)) {
+ if (this.brokerVersionTable.get(brokerName).containsKey(brokerAddr)) {
+ return this.brokerVersionTable.get(brokerName).get(brokerAddr);
+ }
+ }
+ return 0;
+ }
+
public List<String> findConsumerIdList(final String topic, final String group) {
String brokerAddr = this.findBrokerAddrByTopic(topic);
if (null == brokerAddr) {
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index f79f726..f0a73bd 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -99,6 +99,25 @@ public class BrokerConfig {
private boolean traceOn = true;
+ // Switch of filter bit map calculation.
+ // If switch on:
+ // 1. Calculate filter bit map when construct queue.
+ // 2. Filter bit map will be saved to consume queue extend file if allowed.
+ private boolean enableCalcFilterBitMap = false;
+
+ // Expect num of consumers will use filter.
+ private int expectConsumerNumUseFilter = 32;
+
+ // Error rate of bloom filter, 1~100.
+ private int maxErrorRateOfBloomFilter = 20;
+
+ //how long to clean filter data after dead.Default: 24h
+ private long filterDataCleanTimeSpan = 24 * 3600 * 1000;
+
+ // whether do filter when retry.
+ private boolean filterSupportRetry = false;
+ private boolean enablePropertyFilter = false;
+
public static String localHostName() {
try {
return InetAddress.getLocalHost().getHostName();
@@ -484,4 +503,52 @@ public class BrokerConfig {
public void setCommercialBaseCount(int commercialBaseCount) {
this.commercialBaseCount = commercialBaseCount;
}
+
+ public boolean isEnableCalcFilterBitMap() {
+ return enableCalcFilterBitMap;
+ }
+
+ public void setEnableCalcFilterBitMap(boolean enableCalcFilterBitMap) {
+ this.enableCalcFilterBitMap = enableCalcFilterBitMap;
+ }
+
+ public int getExpectConsumerNumUseFilter() {
+ return expectConsumerNumUseFilter;
+ }
+
+ public void setExpectConsumerNumUseFilter(int expectConsumerNumUseFilter) {
+ this.expectConsumerNumUseFilter = expectConsumerNumUseFilter;
+ }
+
+ public int getMaxErrorRateOfBloomFilter() {
+ return maxErrorRateOfBloomFilter;
+ }
+
+ public void setMaxErrorRateOfBloomFilter(int maxErrorRateOfBloomFilter) {
+ this.maxErrorRateOfBloomFilter = maxErrorRateOfBloomFilter;
+ }
+
+ public long getFilterDataCleanTimeSpan() {
+ return filterDataCleanTimeSpan;
+ }
+
+ public void setFilterDataCleanTimeSpan(long filterDataCleanTimeSpan) {
+ this.filterDataCleanTimeSpan = filterDataCleanTimeSpan;
+ }
+
+ public boolean isFilterSupportRetry() {
+ return filterSupportRetry;
+ }
+
+ public void setFilterSupportRetry(boolean filterSupportRetry) {
+ this.filterSupportRetry = filterSupportRetry;
+ }
+
+ public boolean isEnablePropertyFilter() {
+ return enablePropertyFilter;
+ }
+
+ public void setEnablePropertyFilter(boolean enablePropertyFilter) {
+ this.enablePropertyFilter = enablePropertyFilter;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/common/src/main/java/org/apache/rocketmq/common/MixAll.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/MixAll.java b/common/src/main/java/org/apache/rocketmq/common/MixAll.java
index 4a54a60..e75efd9 100644
--- a/common/src/main/java/org/apache/rocketmq/common/MixAll.java
+++ b/common/src/main/java/org/apache/rocketmq/common/MixAll.java
@@ -55,8 +55,8 @@ public class MixAll {
public static final String DEFAULT_NAMESRV_ADDR_LOOKUP = "jmenv.tbsite.net";
public static final String WS_DOMAIN_NAME = System.getProperty("rocketmq.namesrv.domain", DEFAULT_NAMESRV_ADDR_LOOKUP);
public static final String WS_DOMAIN_SUBGROUP = System.getProperty("rocketmq.namesrv.domain.subgroup", "nsaddr");
- // http://jmenv.tbsite.net:8080/rocketmq/nsaddr
- public static final String WS_ADDR = "http://" + WS_DOMAIN_NAME + ":8080/rocketmq/" + WS_DOMAIN_SUBGROUP;
+// // http://jmenv.tbsite.net:8080/rocketmq/nsaddr
+// public static final String WS_ADDR = "http://" + WS_DOMAIN_NAME + ":8080/rocketmq/" + WS_DOMAIN_SUBGROUP;
public static final String DEFAULT_TOPIC = "TBW102";
public static final String BENCHMARK_TOPIC = "BenchmarkTest";
public static final String DEFAULT_PRODUCER_GROUP = "DEFAULT_PRODUCER";
@@ -89,6 +89,16 @@ public class MixAll {
public static final String DEFAULT_TRACE_REGION_ID = "DefaultRegion";
public static final String CONSUME_CONTEXT_TYPE = "ConsumeContextType";
+ public static String getWSAddr() {
+ String wsDomainName = System.getProperty("rocketmq.namesrv.domain", DEFAULT_NAMESRV_ADDR_LOOKUP);
+ String wsDomainSubgroup = System.getProperty("rocketmq.namesrv.domain.subgroup", "nsaddr");
+ String wsAddr = "http://" + wsDomainName + ":8080/rocketmq/" + wsDomainSubgroup;
+ if (wsDomainName.indexOf(":") > 0) {
+ wsAddr = "http://" + wsDomainName + "/rocketmq/" + wsDomainSubgroup;
+ }
+ return wsAddr;
+ }
+
public static String getRetryTopic(final String consumerGroup) {
return RETRY_GROUP_TOPIC_PREFIX + consumerGroup;
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/common/src/main/java/org/apache/rocketmq/common/constant/LoggerName.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/constant/LoggerName.java b/common/src/main/java/org/apache/rocketmq/common/constant/LoggerName.java
index e706e28..385c121 100644
--- a/common/src/main/java/org/apache/rocketmq/common/constant/LoggerName.java
+++ b/common/src/main/java/org/apache/rocketmq/common/constant/LoggerName.java
@@ -34,4 +34,5 @@ public class LoggerName {
public static final String DUPLICATION_LOGGER_NAME = "RocketmqDuplication";
public static final String PROTECTION_LOGGER_NAME = "RocketmqProtection";
public static final String WATER_MARK_LOGGER_NAME = "RocketmqWaterMark";
+ public static final String FILTER_LOGGER_NAME = "RocketmqFilter";
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/common/src/main/java/org/apache/rocketmq/common/filter/ExpressionType.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/filter/ExpressionType.java b/common/src/main/java/org/apache/rocketmq/common/filter/ExpressionType.java
new file mode 100644
index 0000000..3b7940a
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/filter/ExpressionType.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.filter;
+
+public class ExpressionType {
+
+ /**
+ * <ul>
+ * Keywords:
+ * <li>{@code AND, OR, NOT, BETWEEN, IN, TRUE, FALSE, IS, NULL}</li>
+ * </ul>
+ * <p/>
+ * <ul>
+ * Data type:
+ * <li>Boolean, like: TRUE, FALSE</li>
+ * <li>String, like: 'abc'</li>
+ * <li>Decimal, like: 123</li>
+ * <li>Float number, like: 3.1415</li>
+ * </ul>
+ * <p/>
+ * <ul>
+ * Grammar:
+ * <li>{@code AND, OR}</li>
+ * <li>{@code >, >=, <, <=, =}</li>
+ * <li>{@code BETWEEN A AND B}, equals to {@code >=A AND <=B}</li>
+ * <li>{@code NOT BETWEEN A AND B}, equals to {@code >B OR <A}</li>
+ * <li>{@code IN ('a', 'b')}, equals to {@code ='a' OR ='b'}, this operation only support String type.</li>
+ * <li>{@code IS NULL}, {@code IS NOT NULL}, check parameter whether is null, or not.</li>
+ * <li>{@code =TRUE}, {@code =FALSE}, check parameter whether is true, or false.</li>
+ * </ul>
+ * <p/>
+ * <p>
+ * Example:
+ * (a > 10 AND a < 100) OR (b IS NOT NULL AND b=TRUE)
+ * </p>
+ */
+ public static final String SQL92 = "SQL92";
+
+ /**
+ * Only support or operation such as
+ * "tag1 || tag2 || tag3", <br>
+ * If null or * expression,meaning subscribe all.
+ */
+ public static final String TAG = "TAG";
+
+ public static boolean isTagType(String type) {
+ if (type == null || TAG.equals(type)) {
+ return true;
+ }
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/common/src/main/java/org/apache/rocketmq/common/filter/FilterAPI.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/filter/FilterAPI.java b/common/src/main/java/org/apache/rocketmq/common/filter/FilterAPI.java
index e9bf3fa..fc8525c 100644
--- a/common/src/main/java/org/apache/rocketmq/common/filter/FilterAPI.java
+++ b/common/src/main/java/org/apache/rocketmq/common/filter/FilterAPI.java
@@ -63,4 +63,22 @@ public class FilterAPI {
return subscriptionData;
}
+
+ public static SubscriptionData build(final String topic, final String subString,
+ final String type) throws Exception {
+ if (ExpressionType.TAG.equals(type) || type == null) {
+ return buildSubscriptionData(null, topic, subString);
+ }
+
+ if (subString == null || subString.length() < 1) {
+ throw new IllegalArgumentException("Expression can't be null! " + type);
+ }
+
+ SubscriptionData subscriptionData = new SubscriptionData();
+ subscriptionData.setTopic(topic);
+ subscriptionData.setSubString(subString);
+ subscriptionData.setExpressionType(type);
+
+ return subscriptionData;
+ }
}