You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2017/08/01 07:23:26 UTC
[28/50] [abbrv] incubator-rocketmq git commit: [ROCKETMQ-121]Support
message filtering based on SQL92 closes apache/incubator-rocketmq#82
[ROCKETMQ-121]Support message filtering based on SQL92 closes apache/incubator-rocketmq#82
Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/9eeb2f7e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/9eeb2f7e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/9eeb2f7e
Branch: refs/heads/develop
Commit: 9eeb2f7e4e321c2f4688a3d3470731c0314c7cf3
Parents: 2093b65
Author: vsair <li...@gmail.com>
Authored: Fri Apr 21 18:17:58 2017 +0800
Committer: dongeforever <zh...@yeah.net>
Committed: Tue Jun 6 11:37:29 2017 +0800
----------------------------------------------------------------------
broker/pom.xml | 4 +
.../rocketmq/broker/BrokerController.java | 27 +
.../rocketmq/broker/BrokerPathConfigHelper.java | 3 +
.../broker/client/ConsumerGroupEvent.java | 33 +
.../client/ConsumerIdsChangeListener.java | 6 +-
.../rocketmq/broker/client/ConsumerManager.java | 11 +-
.../DefaultConsumerIdsChangeListener.java | 37 +-
.../filter/CommitLogDispatcherCalcBitMap.java | 110 ++
.../broker/filter/ConsumerFilterData.java | 151 ++
.../broker/filter/ConsumerFilterManager.java | 471 ++++++
.../filter/ExpressionForRetryMessageFilter.java | 97 ++
.../broker/filter/ExpressionMessageFilter.java | 162 +++
.../broker/filter/MessageEvaluationContext.java | 58 +
.../NotifyMessageArrivingListener.java | 8 +-
.../broker/longpolling/PullRequest.java | 10 +-
.../longpolling/PullRequestHoldService.java | 19 +-
.../rocketmq/broker/out/BrokerOuterAPI.java | 2 +-
.../plugin/AbstractPluginMessageStore.java | 18 +-
.../broker/processor/AdminBrokerProcessor.java | 91 ++
.../broker/processor/ClientManageProcessor.java | 44 +
.../broker/processor/PullMessageProcessor.java | 59 +-
.../CommitLogDispatcherCalcBitMapTest.java | 192 +++
.../filter/ConsumerFilterManagerTest.java | 291 ++++
.../filter/MessageStoreWithFilterTest.java | 392 +++++
.../processor/PullMessageProcessorTest.java | 9 +-
.../client/consumer/DefaultMQPushConsumer.java | 15 +
.../client/consumer/MQPushConsumer.java | 21 +
.../client/consumer/MessageSelector.java | 77 +
.../rocketmq/client/impl/FindBrokerResult.java | 12 +
.../rocketmq/client/impl/MQClientAPIImpl.java | 57 +-
.../consumer/DefaultMQPushConsumerImpl.java | 40 +-
.../client/impl/consumer/PullAPIWrapper.java | 40 +
.../client/impl/factory/MQClientInstance.java | 60 +-
.../apache/rocketmq/common/BrokerConfig.java | 67 +
.../java/org/apache/rocketmq/common/MixAll.java | 14 +-
.../rocketmq/common/constant/LoggerName.java | 1 +
.../rocketmq/common/filter/ExpressionType.java | 67 +
.../rocketmq/common/filter/FilterAPI.java | 18 +
.../apache/rocketmq/common/message/Message.java | 6 +
.../rocketmq/common/message/MessageDecoder.java | 39 +
.../rocketmq/common/namesrv/TopAddressing.java | 2 +-
.../rocketmq/common/protocol/RequestCode.java | 4 +
.../rocketmq/common/protocol/ResponseCode.java | 4 +
.../protocol/body/CheckClientRequestBody.java | 52 +
.../common/protocol/body/ConsumeQueueData.java | 98 ++
.../body/QueryConsumeQueueResponseBody.java | 72 +
.../header/PullMessageRequestHeader.java | 9 +
.../header/QueryConsumeQueueRequestHeader.java | 75 +
.../protocol/heartbeat/SubscriptionData.java | 17 +-
.../rocketmq/common/filter/FilterAPITest.java | 49 +
.../common/message/MessageDecoderTest.java | 80 ++
distribution/conf/logback_broker.xml | 28 +
distribution/release.xml | 1 +
.../rocketmq/example/benchmark/Consumer.java | 31 +-
.../rocketmq/example/benchmark/Producer.java | 34 +-
.../rocketmq/example/filter/SqlConsumer.java | 62 +
.../rocketmq/example/filter/SqlProducer.java | 67 +
filter/pom.xml | 43 +
.../apache/rocketmq/filter/FilterFactory.java | 72 +
.../org/apache/rocketmq/filter/FilterSpi.java | 43 +
.../org/apache/rocketmq/filter/SqlFilter.java | 43 +
.../rocketmq/filter/constant/UnaryType.java | 26 +
.../filter/expression/BinaryExpression.java | 91 ++
.../filter/expression/BooleanExpression.java | 39 +
.../filter/expression/ComparisonExpression.java | 413 ++++++
.../filter/expression/ConstantExpression.java | 156 ++
.../expression/EmptyEvaluationContext.java | 35 +
.../filter/expression/EvaluationContext.java | 43 +
.../rocketmq/filter/expression/Expression.java | 38 +
.../filter/expression/LogicExpression.java | 94 ++
.../filter/expression/MQFilterException.java | 46 +
.../filter/expression/NowExpression.java | 36 +
.../filter/expression/PropertyExpression.java | 70 +
.../filter/expression/UnaryExpression.java | 267 ++++
.../filter/expression/UnaryInExpression.java | 61 +
.../rocketmq/filter/parser/ParseException.java | 204 +++
.../rocketmq/filter/parser/SelectorParser.java | 1354 ++++++++++++++++++
.../rocketmq/filter/parser/SelectorParser.jj | 524 +++++++
.../filter/parser/SelectorParserConstants.java | 140 ++
.../parser/SelectorParserTokenManager.java | 919 ++++++++++++
.../filter/parser/SimpleCharStream.java | 502 +++++++
.../apache/rocketmq/filter/parser/Token.java | 152 ++
.../rocketmq/filter/parser/TokenMgrError.java | 174 +++
.../apache/rocketmq/filter/util/BitsArray.java | 260 ++++
.../rocketmq/filter/util/BloomFilter.java | 338 +++++
.../rocketmq/filter/util/BloomFilterData.java | 83 ++
.../apache/rocketmq/filter/BitsArrayTest.java | 123 ++
.../apache/rocketmq/filter/BloomFilterTest.java | 172 +++
.../apache/rocketmq/filter/ExpressionTest.java | 594 ++++++++
.../apache/rocketmq/filter/FilterSpiTest.java | 84 ++
.../org/apache/rocketmq/filter/ParserTest.java | 129 ++
pom.xml | 11 +
srvutil/pom.xml | 4 +
.../org/apache/rocketmq/store/CommitLog.java | 8 +-
.../rocketmq/store/CommitLogDispatcher.java | 26 +
.../org/apache/rocketmq/store/ConsumeQueue.java | 122 +-
.../apache/rocketmq/store/ConsumeQueueExt.java | 638 +++++++++
.../rocketmq/store/DefaultMessageFilter.java | 29 +-
.../rocketmq/store/DefaultMessageStore.java | 132 +-
.../apache/rocketmq/store/DispatchRequest.java | 21 +-
.../org/apache/rocketmq/store/MappedFile.java | 25 +
.../apache/rocketmq/store/MappedFileQueue.java | 2 +-
.../rocketmq/store/MessageArrivingListener.java | 5 +-
.../apache/rocketmq/store/MessageFilter.java | 26 +-
.../org/apache/rocketmq/store/MessageStore.java | 8 +-
.../store/config/MessageStoreConfig.java | 31 +
.../store/config/StorePathConfigHelper.java | 4 +
.../store/schedule/ScheduleMessageService.java | 14 +
.../rocketmq/store/ConsumeQueueExtTest.java | 251 ++++
.../apache/rocketmq/store/ConsumeQueueTest.java | 226 +++
.../rocketmq/store/DefaultMessageStoreTest.java | 4 +-
.../rocketmq/tools/admin/DefaultMQAdminExt.java | 9 +
.../tools/admin/DefaultMQAdminExtImpl.java | 8 +
.../apache/rocketmq/tools/admin/MQAdminExt.java | 22 +
.../rocketmq/tools/command/MQAdminStartup.java | 3 +
.../command/queue/QueryConsumeQueueCommand.java | 159 ++
116 files changed, 12552 insertions(+), 128 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/broker/pom.xml
----------------------------------------------------------------------
diff --git a/broker/pom.xml b/broker/pom.xml
index 8cdafea..0f8ad0a 100644
--- a/broker/pom.xml
+++ b/broker/pom.xml
@@ -49,6 +49,10 @@
<artifactId>rocketmq-srvutil</artifactId>
</dependency>
<dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>rocketmq-filter</artifactId>
+ </dependency>
+ <dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 6acd40c..bacd25c 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -37,6 +37,8 @@ import org.apache.rocketmq.broker.client.DefaultConsumerIdsChangeListener;
import org.apache.rocketmq.broker.client.ProducerManager;
import org.apache.rocketmq.broker.client.net.Broker2Client;
import org.apache.rocketmq.broker.client.rebalance.RebalanceLockManager;
+import org.apache.rocketmq.broker.filter.CommitLogDispatcherCalcBitMap;
+import org.apache.rocketmq.broker.filter.ConsumerFilterManager;
import org.apache.rocketmq.broker.filtersrv.FilterServerManager;
import org.apache.rocketmq.broker.latency.BrokerFastFailure;
import org.apache.rocketmq.broker.latency.BrokerFixedThreadPoolExecutor;
@@ -96,6 +98,7 @@ public class BrokerController {
private final MessageStoreConfig messageStoreConfig;
private final ConsumerOffsetManager consumerOffsetManager;
private final ConsumerManager consumerManager;
+ private final ConsumerFilterManager consumerFilterManager;
private final ProducerManager producerManager;
private final ClientHousekeepingService clientHousekeepingService;
private final PullMessageProcessor pullMessageProcessor;
@@ -149,6 +152,7 @@ public class BrokerController {
this.messageArrivingListener = new NotifyMessageArrivingListener(this.pullRequestHoldService);
this.consumerIdsChangeListener = new DefaultConsumerIdsChangeListener(this);
this.consumerManager = new ConsumerManager(this.consumerIdsChangeListener);
+ this.consumerFilterManager = new ConsumerFilterManager(this);
this.producerManager = new ProducerManager();
this.clientHousekeepingService = new ClientHousekeepingService(this);
this.broker2Client = new Broker2Client(this);
@@ -192,6 +196,7 @@ public class BrokerController {
result = result && this.consumerOffsetManager.load();
result = result && this.subscriptionGroupManager.load();
+ result = result && this.consumerFilterManager.load();
if (result) {
try {
@@ -202,6 +207,7 @@ public class BrokerController {
//load plugin
MessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig);
this.messageStore = MessageStoreFactory.build(context, this.messageStore);
+ this.messageStore.getDispatcherList().addFirst(new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager));
} catch (IOException e) {
result = false;
e.printStackTrace();
@@ -278,6 +284,17 @@ public class BrokerController {
@Override
public void run() {
try {
+ BrokerController.this.consumerFilterManager.persist();
+ } catch (Throwable e) {
+ log.error("schedule persist consumer filter error.", e);
+ }
+ }
+ }, 1000 * 10, 1000 * 10, TimeUnit.MILLISECONDS);
+
+ this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ try {
BrokerController.this.protectBroker();
} catch (Exception e) {
log.error("protectBroker error.", e);
@@ -400,9 +417,11 @@ public class BrokerController {
ClientManageProcessor clientProcessor = new ClientManageProcessor(this);
this.remotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.clientManageExecutor);
this.remotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor);
+ this.remotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientProcessor, this.clientManageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.clientManageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor);
+ this.remotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientProcessor, this.clientManageExecutor);
/**
* ConsumerManageProcessor
@@ -504,6 +523,10 @@ public class BrokerController {
return consumerManager;
}
+ public ConsumerFilterManager getConsumerFilterManager() {
+ return consumerFilterManager;
+ }
+
public ConsumerOffsetManager getConsumerOffsetManager() {
return consumerOffsetManager;
}
@@ -590,6 +613,10 @@ public class BrokerController {
if (this.brokerFastFailure != null) {
this.brokerFastFailure.shutdown();
}
+
+ if (this.consumerFilterManager != null) {
+ this.consumerFilterManager.persist();
+ }
}
private void unregisterBrokerAll() {
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/broker/src/main/java/org/apache/rocketmq/broker/BrokerPathConfigHelper.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerPathConfigHelper.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerPathConfigHelper.java
index 24876df..0a323ee 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerPathConfigHelper.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerPathConfigHelper.java
@@ -44,4 +44,7 @@ public class BrokerPathConfigHelper {
return rootDir + File.separator + "config" + File.separator + "subscriptionGroup.json";
}
+ public static String getConsumerFilterPath(final String rootDir) {
+ return rootDir + File.separator + "config" + File.separator + "consumerFilter.json";
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerGroupEvent.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerGroupEvent.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerGroupEvent.java
new file mode 100644
index 0000000..717fb70
--- /dev/null
+++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerGroupEvent.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.broker.client;
+
+public enum ConsumerGroupEvent {
+
+ /**
+ * Some consumers in the group are changed.
+ */
+ CHANGE,
+ /**
+ * The group of consumer is unregistered.
+ */
+ UNREGISTER,
+ /**
+ * The group of consumer is registered.
+ */
+ REGISTER
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerIdsChangeListener.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerIdsChangeListener.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerIdsChangeListener.java
index 07d28dc..831e293 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerIdsChangeListener.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerIdsChangeListener.java
@@ -16,9 +16,7 @@
*/
package org.apache.rocketmq.broker.client;
-import io.netty.channel.Channel;
-import java.util.List;
-
public interface ConsumerIdsChangeListener {
- void consumerIdsChanged(final String group, final List<Channel> channels);
+
+ void handle(ConsumerGroupEvent event, String group, Object... args);
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java
index a2d88d5..a5ddec8 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java
@@ -85,10 +85,11 @@ public class ConsumerManager {
if (remove != null) {
log.info("unregister consumer ok, no any connection, and remove consumer group, {}",
next.getKey());
+ this.consumerIdsChangeListener.handle(ConsumerGroupEvent.UNREGISTER, next.getKey());
}
}
- this.consumerIdsChangeListener.consumerIdsChanged(next.getKey(), info.getAllChannel());
+ this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, next.getKey(), info.getAllChannel());
}
}
}
@@ -111,10 +112,12 @@ public class ConsumerManager {
if (r1 || r2) {
if (isNotifyConsumerIdsChangedEnable) {
- this.consumerIdsChangeListener.consumerIdsChanged(group, consumerGroupInfo.getAllChannel());
+ this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());
}
}
+ this.consumerIdsChangeListener.handle(ConsumerGroupEvent.REGISTER, group, subList);
+
return r1 || r2;
}
@@ -126,10 +129,12 @@ public class ConsumerManager {
ConsumerGroupInfo remove = this.consumerTable.remove(group);
if (remove != null) {
log.info("unregister consumer ok, no any connection, and remove consumer group, {}", group);
+
+ this.consumerIdsChangeListener.handle(ConsumerGroupEvent.UNREGISTER, group);
}
}
if (isNotifyConsumerIdsChangedEnable) {
- this.consumerIdsChangeListener.consumerIdsChanged(group, consumerGroupInfo.getAllChannel());
+ this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/broker/src/main/java/org/apache/rocketmq/broker/client/DefaultConsumerIdsChangeListener.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/DefaultConsumerIdsChangeListener.java b/broker/src/main/java/org/apache/rocketmq/broker/client/DefaultConsumerIdsChangeListener.java
index a1b2d8a..d716a33 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/client/DefaultConsumerIdsChangeListener.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/client/DefaultConsumerIdsChangeListener.java
@@ -17,8 +17,12 @@
package org.apache.rocketmq.broker.client;
import io.netty.channel.Channel;
+
+import java.util.Collection;
import java.util.List;
+
import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
public class DefaultConsumerIdsChangeListener implements ConsumerIdsChangeListener {
private final BrokerController brokerController;
@@ -28,11 +32,34 @@ public class DefaultConsumerIdsChangeListener implements ConsumerIdsChangeListen
}
@Override
- public void consumerIdsChanged(String group, List<Channel> channels) {
- if (channels != null && brokerController.getBrokerConfig().isNotifyConsumerIdsChangedEnable()) {
- for (Channel chl : channels) {
- this.brokerController.getBroker2Client().notifyConsumerIdsChanged(chl, group);
- }
+ public void handle(ConsumerGroupEvent event, String group, Object... args) {
+ if (event == null) {
+ return;
+ }
+ switch (event) {
+ case CHANGE:
+ if (args == null || args.length < 1) {
+ return;
+ }
+ List<Channel> channels = (List<Channel>) args[0];
+ if (channels != null && brokerController.getBrokerConfig().isNotifyConsumerIdsChangedEnable()) {
+ for (Channel chl : channels) {
+ this.brokerController.getBroker2Client().notifyConsumerIdsChanged(chl, group);
+ }
+ }
+ break;
+ case UNREGISTER:
+ this.brokerController.getConsumerFilterManager().unRegister(group);
+ break;
+ case REGISTER:
+ if (args == null || args.length < 1) {
+ return;
+ }
+ Collection<SubscriptionData> subscriptionDataList = (Collection<SubscriptionData>) args[0];
+ this.brokerController.getConsumerFilterManager().register(group, subscriptionDataList);
+ break;
+ default:
+ throw new RuntimeException("Unknown event " + event);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/broker/src/main/java/org/apache/rocketmq/broker/filter/CommitLogDispatcherCalcBitMap.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/filter/CommitLogDispatcherCalcBitMap.java b/broker/src/main/java/org/apache/rocketmq/broker/filter/CommitLogDispatcherCalcBitMap.java
new file mode 100644
index 0000000..85415d6
--- /dev/null
+++ b/broker/src/main/java/org/apache/rocketmq/broker/filter/CommitLogDispatcherCalcBitMap.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.broker.filter;
+
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.filter.util.BitsArray;
+import org.apache.rocketmq.store.CommitLogDispatcher;
+import org.apache.rocketmq.store.DispatchRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.Iterator;
+
+/**
+ * Calculate bit map of filter.
+ */
+public class CommitLogDispatcherCalcBitMap implements CommitLogDispatcher {
+
+ private static final Logger log = LoggerFactory.getLogger(LoggerName.FILTER_LOGGER_NAME);
+
+ protected final BrokerConfig brokerConfig;
+ protected final ConsumerFilterManager consumerFilterManager;
+
+ public CommitLogDispatcherCalcBitMap(BrokerConfig brokerConfig, ConsumerFilterManager consumerFilterManager) {
+ this.brokerConfig = brokerConfig;
+ this.consumerFilterManager = consumerFilterManager;
+ }
+
+ @Override
+ public void dispatch(DispatchRequest request) {
+ if (!this.brokerConfig.isEnableCalcFilterBitMap()) {
+ return;
+ }
+
+ try {
+
+ Collection<ConsumerFilterData> filterDatas = consumerFilterManager.get(request.getTopic());
+
+ if (filterDatas == null || filterDatas.isEmpty()) {
+ return;
+ }
+
+ Iterator<ConsumerFilterData> iterator = filterDatas.iterator();
+ BitsArray filterBitMap = BitsArray.create(
+ this.consumerFilterManager.getBloomFilter().getM()
+ );
+
+ long startTime = System.currentTimeMillis();
+ while (iterator.hasNext()) {
+ ConsumerFilterData filterData = iterator.next();
+
+ if (filterData.getCompiledExpression() == null) {
+ log.error("[BUG] Consumer in filter manager has no compiled expression! {}", filterData);
+ continue;
+ }
+
+ if (filterData.getBloomFilterData() == null) {
+ log.error("[BUG] Consumer in filter manager has no bloom data! {}", filterData);
+ continue;
+ }
+
+ Object ret = null;
+ try {
+ MessageEvaluationContext context = new MessageEvaluationContext(request.getPropertiesMap());
+
+ ret = filterData.getCompiledExpression().evaluate(context);
+ } catch (Throwable e) {
+ log.error("Calc filter bit map error!commitLogOffset={}, consumer={}, {}", request.getCommitLogOffset(), filterData, e);
+ }
+
+ log.debug("Result of Calc bit map:ret={}, data={}, props={}, offset={}", ret, filterData, request.getPropertiesMap(), request.getCommitLogOffset());
+
+ // eval true
+ if (ret != null && ret instanceof Boolean && (Boolean) ret) {
+ consumerFilterManager.getBloomFilter().hashTo(
+ filterData.getBloomFilterData(),
+ filterBitMap
+ );
+ }
+ }
+
+ request.setBitMap(filterBitMap.bytes());
+
+ long eclipseTime = System.currentTimeMillis() - startTime;
+ // 1ms
+ if (eclipseTime >= 1) {
+ log.warn("Spend {} ms to calc bit map, consumerNum={}, topic={}", eclipseTime, filterDatas.size(), request.getTopic());
+ }
+ } catch (Throwable e) {
+ log.error("Calc bit map error! topic={}, offset={}, queueId={}, {}", request.getTopic(), request.getCommitLogOffset(), request.getQueueId(), e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/broker/src/main/java/org/apache/rocketmq/broker/filter/ConsumerFilterData.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/filter/ConsumerFilterData.java b/broker/src/main/java/org/apache/rocketmq/broker/filter/ConsumerFilterData.java
new file mode 100644
index 0000000..4db02e2
--- /dev/null
+++ b/broker/src/main/java/org/apache/rocketmq/broker/filter/ConsumerFilterData.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.broker.filter;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.commons.lang3.builder.ReflectionToStringBuilder;
+import org.apache.commons.lang3.builder.ToStringStyle;
+import org.apache.rocketmq.filter.expression.Expression;
+import org.apache.rocketmq.filter.util.BloomFilterData;
+
+import java.util.Collections;
+
+/**
+ * Filter data of consumer.
+ */
+public class ConsumerFilterData {
+
+ private String consumerGroup;
+ private String topic;
+ private String expression;
+ private String expressionType;
+ private transient Expression compiledExpression;
+ private long bornTime;
+ private long deadTime = 0;
+ private BloomFilterData bloomFilterData;
+ private long clientVersion;
+
+ public boolean isDead() {
+ return this.deadTime >= this.bornTime;
+ }
+
+ public long howLongAfterDeath() {
+ if (isDead()) {
+ return System.currentTimeMillis() - getDeadTime();
+ }
+ return -1;
+ }
+
+ /**
+ * Check this filter data has been used to calculate bit map when msg was stored in server.
+ *
+ * @param msgStoreTime
+ * @return
+ */
+ public boolean isMsgInLive(long msgStoreTime) {
+ return msgStoreTime > getBornTime();
+ }
+
+ public String getConsumerGroup() {
+ return consumerGroup;
+ }
+
+ public void setConsumerGroup(final String consumerGroup) {
+ this.consumerGroup = consumerGroup;
+ }
+
+ public String getTopic() {
+ return topic;
+ }
+
+ public void setTopic(final String topic) {
+ this.topic = topic;
+ }
+
+ public String getExpression() {
+ return expression;
+ }
+
+ public void setExpression(final String expression) {
+ this.expression = expression;
+ }
+
+ public String getExpressionType() {
+ return expressionType;
+ }
+
+ public void setExpressionType(final String expressionType) {
+ this.expressionType = expressionType;
+ }
+
+ public Expression getCompiledExpression() {
+ return compiledExpression;
+ }
+
+ public void setCompiledExpression(final Expression compiledExpression) {
+ this.compiledExpression = compiledExpression;
+ }
+
+ public long getBornTime() {
+ return bornTime;
+ }
+
+ public void setBornTime(final long bornTime) {
+ this.bornTime = bornTime;
+ }
+
+ public long getDeadTime() {
+ return deadTime;
+ }
+
+ public void setDeadTime(final long deadTime) {
+ this.deadTime = deadTime;
+ }
+
+ public BloomFilterData getBloomFilterData() {
+ return bloomFilterData;
+ }
+
+ public void setBloomFilterData(final BloomFilterData bloomFilterData) {
+ this.bloomFilterData = bloomFilterData;
+ }
+
+ public long getClientVersion() {
+ return clientVersion;
+ }
+
+ public void setClientVersion(long clientVersion) {
+ this.clientVersion = clientVersion;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ return EqualsBuilder.reflectionEquals(this, o, Collections.<String>emptyList());
+ }
+
+ @Override
+ public int hashCode() {
+ return HashCodeBuilder.reflectionHashCode(this, Collections.<String>emptyList());
+ }
+
+ @Override
+ public String toString() {
+ return ReflectionToStringBuilder.toString(this, ToStringStyle.SHORT_PREFIX_STYLE);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/broker/src/main/java/org/apache/rocketmq/broker/filter/ConsumerFilterManager.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/filter/ConsumerFilterManager.java b/broker/src/main/java/org/apache/rocketmq/broker/filter/ConsumerFilterManager.java
new file mode 100644
index 0000000..7f790af
--- /dev/null
+++ b/broker/src/main/java/org/apache/rocketmq/broker/filter/ConsumerFilterManager.java
@@ -0,0 +1,471 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.broker.filter;
+
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.BrokerPathConfigHelper;
+import org.apache.rocketmq.common.ConfigManager;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import org.apache.rocketmq.filter.FilterFactory;
+import org.apache.rocketmq.common.filter.ExpressionType;
+import org.apache.rocketmq.filter.util.BloomFilter;
+import org.apache.rocketmq.filter.util.BloomFilterData;
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Consumer filter data manager.Just manage the consumers use expression filter.
+ */
+public class ConsumerFilterManager extends ConfigManager {
+
+ private static final Logger log = LoggerFactory.getLogger(LoggerName.FILTER_LOGGER_NAME);
+
+ private static final long MS_24_HOUR = 24 * 3600 * 1000;
+
+ private ConcurrentHashMap<String/*Topic*/, FilterDataMapByTopic>
+ filterDataByTopic = new ConcurrentHashMap<String/*consumer group*/, FilterDataMapByTopic>(256);
+
+ private transient BrokerController brokerController;
+ private transient BloomFilter bloomFilter;
+
+ public ConsumerFilterManager() {
+ // just for test
+ this.bloomFilter = BloomFilter.createByFn(20, 64);
+ }
+
+ public ConsumerFilterManager(BrokerController brokerController) {
+ this.brokerController = brokerController;
+ this.bloomFilter = BloomFilter.createByFn(
+ brokerController.getBrokerConfig().getMaxErrorRateOfBloomFilter(),
+ brokerController.getBrokerConfig().getExpectConsumerNumUseFilter()
+ );
+ // then set bit map length of store config.
+ brokerController.getMessageStoreConfig().setBitMapLengthConsumeQueueExt(
+ this.bloomFilter.getM()
+ );
+ }
+
+ /**
+ * Build consumer filter data.Be care, bloom filter data is not included.
+ *
+ * @param topic
+ * @param consumerGroup
+ * @param expression
+ * @param type
+ * @param clientVersion
+ * @return maybe null
+ */
+ public static ConsumerFilterData build(final String topic, final String consumerGroup,
+ final String expression, final String type,
+ final long clientVersion) {
+ if (ExpressionType.isTagType(type)) {
+ return null;
+ }
+
+ ConsumerFilterData consumerFilterData = new ConsumerFilterData();
+ consumerFilterData.setTopic(topic);
+ consumerFilterData.setConsumerGroup(consumerGroup);
+ consumerFilterData.setBornTime(System.currentTimeMillis());
+ consumerFilterData.setDeadTime(0);
+ consumerFilterData.setExpression(expression);
+ consumerFilterData.setExpressionType(type);
+ consumerFilterData.setClientVersion(clientVersion);
+ try {
+ consumerFilterData.setCompiledExpression(
+ FilterFactory.INSTANCE.get(type).compile(expression)
+ );
+ } catch (Throwable e) {
+ log.error("parse error: expr={}, topic={}, group={}, error={}", expression, topic, consumerGroup, e.getMessage());
+ return null;
+ }
+
+ return consumerFilterData;
+ }
+
+ public void register(final String consumerGroup, final Collection<SubscriptionData> subList) {
+ for (SubscriptionData subscriptionData : subList) {
+ register(
+ subscriptionData.getTopic(),
+ consumerGroup,
+ subscriptionData.getSubString(),
+ subscriptionData.getExpressionType(),
+ subscriptionData.getSubVersion()
+ );
+ }
+
+ // make illegal topic dead.
+ Collection<ConsumerFilterData> groupFilterData = getByGroup(consumerGroup);
+
+ Iterator<ConsumerFilterData> iterator = groupFilterData.iterator();
+ while (iterator.hasNext()) {
+ ConsumerFilterData filterData = iterator.next();
+
+ boolean exist = false;
+ for (SubscriptionData subscriptionData : subList) {
+ if (subscriptionData.getTopic().equals(filterData.getTopic())) {
+ exist = true;
+ break;
+ }
+ }
+
+ if (!exist && !filterData.isDead()) {
+ filterData.setDeadTime(System.currentTimeMillis());
+ log.info("Consumer filter changed: {}, make illegal topic dead:{}", consumerGroup, filterData);
+ }
+ }
+ }
+
+ public boolean register(final String topic, final String consumerGroup, final String expression,
+ final String type, final long clientVersion) {
+ if (ExpressionType.isTagType(type)) {
+ return false;
+ }
+
+ if (expression == null || expression.length() == 0) {
+ return false;
+ }
+
+ FilterDataMapByTopic filterDataMapByTopic = this.filterDataByTopic.get(topic);
+
+ if (filterDataMapByTopic == null) {
+ FilterDataMapByTopic temp = new FilterDataMapByTopic(topic);
+ FilterDataMapByTopic prev = this.filterDataByTopic.putIfAbsent(topic, temp);
+ filterDataMapByTopic = prev != null ? prev : temp;
+ }
+
+ BloomFilterData bloomFilterData = bloomFilter.generate(consumerGroup + "#" + topic);
+
+ return filterDataMapByTopic.register(consumerGroup, expression, type, bloomFilterData, clientVersion);
+ }
+
+ public void unRegister(final String consumerGroup) {
+ for (String topic : filterDataByTopic.keySet()) {
+ this.filterDataByTopic.get(topic).unRegister(consumerGroup);
+ }
+ }
+
+ public ConsumerFilterData get(final String topic, final String consumerGroup) {
+ if (!this.filterDataByTopic.containsKey(topic)) {
+ return null;
+ }
+ if (this.filterDataByTopic.get(topic).getGroupFilterData().isEmpty()) {
+ return null;
+ }
+
+ return this.filterDataByTopic.get(topic).getGroupFilterData().get(consumerGroup);
+ }
+
+ public Collection<ConsumerFilterData> getByGroup(final String consumerGroup) {
+ Collection<ConsumerFilterData> ret = new HashSet<ConsumerFilterData>();
+
+ Iterator<FilterDataMapByTopic> topicIterator = this.filterDataByTopic.values().iterator();
+ while (topicIterator.hasNext()) {
+ FilterDataMapByTopic filterDataMapByTopic = topicIterator.next();
+
+ Iterator<ConsumerFilterData> filterDataIterator = filterDataMapByTopic.getGroupFilterData().values().iterator();
+
+ while (filterDataIterator.hasNext()) {
+ ConsumerFilterData filterData = filterDataIterator.next();
+
+ if (filterData.getConsumerGroup().equals(consumerGroup)) {
+ ret.add(filterData);
+ }
+ }
+ }
+
+ return ret;
+ }
+
+ public final Collection<ConsumerFilterData> get(final String topic) {
+ if (!this.filterDataByTopic.containsKey(topic)) {
+ return null;
+ }
+ if (this.filterDataByTopic.get(topic).getGroupFilterData().isEmpty()) {
+ return null;
+ }
+
+ return this.filterDataByTopic.get(topic).getGroupFilterData().values();
+ }
+
+ public BloomFilter getBloomFilter() {
+ return bloomFilter;
+ }
+
+ @Override
+ public String encode() {
+ return encode(false);
+ }
+
+ @Override
+ public String configFilePath() {
+ if (this.brokerController != null) {
+ return BrokerPathConfigHelper.getConsumerFilterPath(
+ this.brokerController.getMessageStoreConfig().getStorePathRootDir()
+ );
+ }
+ return BrokerPathConfigHelper.getConsumerFilterPath("./unit_test");
+ }
+
+ @Override
+ public void decode(final String jsonString) {
+ ConsumerFilterManager load = RemotingSerializable.fromJson(jsonString, ConsumerFilterManager.class);
+ if (load != null && load.filterDataByTopic != null) {
+ boolean bloomChanged = false;
+ for (String topic : load.filterDataByTopic.keySet()) {
+ FilterDataMapByTopic dataMapByTopic = load.filterDataByTopic.get(topic);
+ if (dataMapByTopic == null) {
+ continue;
+ }
+
+ for (String group : dataMapByTopic.getGroupFilterData().keySet()) {
+
+ ConsumerFilterData filterData = dataMapByTopic.getGroupFilterData().get(group);
+
+ if (filterData == null) {
+ continue;
+ }
+
+ try {
+ filterData.setCompiledExpression(
+ FilterFactory.INSTANCE.get(filterData.getExpressionType()).compile(filterData.getExpression())
+ );
+ } catch (Exception e) {
+ log.error("load filter data error, " + filterData, e);
+ }
+
+ // check whether bloom filter is changed
+ // if changed, ignore the bit map calculated before.
+ if (!this.bloomFilter.isValid(filterData.getBloomFilterData())) {
+ bloomChanged = true;
+ log.info("Bloom filter is changed!So ignore all filter data persisted! {}, {}", this.bloomFilter, filterData.getBloomFilterData());
+ break;
+ }
+
+ log.info("load exist consumer filter data: {}", filterData);
+
+ if (filterData.getDeadTime() == 0) {
+ // we think all consumers are dead when load
+ long deadTime = System.currentTimeMillis() - 30 * 1000;
+ filterData.setDeadTime(
+ deadTime <= filterData.getBornTime() ? filterData.getBornTime() : deadTime
+ );
+ }
+ }
+ }
+
+ if (!bloomChanged) {
+ this.filterDataByTopic = load.filterDataByTopic;
+ }
+ }
+ }
+
+ @Override
+ public String encode(final boolean prettyFormat) {
+ // clean
+ {
+ clean();
+ }
+ return RemotingSerializable.toJson(this, prettyFormat);
+ }
+
+ public void clean() {
+ Iterator<Map.Entry<String, FilterDataMapByTopic>> topicIterator = this.filterDataByTopic.entrySet().iterator();
+ while (topicIterator.hasNext()) {
+ Map.Entry<String, FilterDataMapByTopic> filterDataMapByTopic = topicIterator.next();
+
+ Iterator<Map.Entry<String, ConsumerFilterData>> filterDataIterator
+ = filterDataMapByTopic.getValue().getGroupFilterData().entrySet().iterator();
+
+ while (filterDataIterator.hasNext()) {
+ Map.Entry<String, ConsumerFilterData> filterDataByGroup = filterDataIterator.next();
+
+ ConsumerFilterData filterData = filterDataByGroup.getValue();
+ if (filterData.howLongAfterDeath() >= (this.brokerController == null ? MS_24_HOUR : this.brokerController.getBrokerConfig().getFilterDataCleanTimeSpan())) {
+ log.info("Remove filter consumer {}, died too long!", filterDataByGroup.getValue());
+ filterDataIterator.remove();
+ }
+ }
+
+ if (filterDataMapByTopic.getValue().getGroupFilterData().isEmpty()) {
+ log.info("Topic has no consumer, remove it! {}", filterDataMapByTopic.getKey());
+ topicIterator.remove();
+ }
+ }
+ }
+
+ public ConcurrentHashMap<String, FilterDataMapByTopic> getFilterDataByTopic() {
+ return filterDataByTopic;
+ }
+
+ public void setFilterDataByTopic(final ConcurrentHashMap<String, FilterDataMapByTopic> filterDataByTopic) {
+ this.filterDataByTopic = filterDataByTopic;
+ }
+
+ public static class FilterDataMapByTopic {
+
+ private ConcurrentHashMap<String/*consumer group*/, ConsumerFilterData>
+ groupFilterData = new ConcurrentHashMap<String, ConsumerFilterData>();
+
+ private String topic;
+
+ public FilterDataMapByTopic() {
+ }
+
+ public FilterDataMapByTopic(String topic) {
+ this.topic = topic;
+ }
+
+ public void unRegister(String consumerGroup) {
+ if (!this.groupFilterData.containsKey(consumerGroup)) {
+ return;
+ }
+
+ ConsumerFilterData data = this.groupFilterData.get(consumerGroup);
+
+ if (data == null || data.isDead()) {
+ return;
+ }
+
+ long now = System.currentTimeMillis();
+
+ log.info("Unregister consumer filter: {}, deadTime: {}", data, now);
+
+ data.setDeadTime(now);
+ }
+
+ public boolean register(String consumerGroup, String expression, String type, BloomFilterData bloomFilterData, long clientVersion) {
+ ConsumerFilterData old = this.groupFilterData.get(consumerGroup);
+
+ if (old == null) {
+ ConsumerFilterData consumerFilterData = build(topic, consumerGroup, expression, type, clientVersion);
+ if (consumerFilterData == null) {
+ return false;
+ }
+ consumerFilterData.setBloomFilterData(bloomFilterData);
+
+ old = this.groupFilterData.putIfAbsent(consumerGroup, consumerFilterData);
+ if (old == null) {
+ log.info("New consumer filter registered: {}", consumerFilterData);
+ return true;
+ } else {
+ if (clientVersion <= old.getClientVersion()) {
+ if (!type.equals(old.getExpressionType()) || !expression.equals(old.getExpression())) {
+ log.warn("Ignore consumer({} : {}) filter(concurrent), because of version {} <= {}, but maybe info changed!old={}:{}, ignored={}:{}",
+ consumerGroup, topic,
+ clientVersion, old.getClientVersion(),
+ old.getExpressionType(), old.getExpression(),
+ type, expression);
+ }
+ if (clientVersion == old.getClientVersion() && old.isDead()) {
+ reAlive(old);
+ return true;
+ }
+
+ return false;
+ } else {
+ this.groupFilterData.put(consumerGroup, consumerFilterData);
+ log.info("New consumer filter registered(concurrent): {}, old: {}", consumerFilterData, old);
+ return true;
+ }
+ }
+ } else {
+ if (clientVersion <= old.getClientVersion()) {
+ if (!type.equals(old.getExpressionType()) || !expression.equals(old.getExpression())) {
+ log.info("Ignore consumer({}:{}) filter, because of version {} <= {}, but maybe info changed!old={}:{}, ignored={}:{}",
+ consumerGroup, topic,
+ clientVersion, old.getClientVersion(),
+ old.getExpressionType(), old.getExpression(),
+ type, expression);
+ }
+ if (clientVersion == old.getClientVersion() && old.isDead()) {
+ reAlive(old);
+ return true;
+ }
+
+ return false;
+ }
+
+ boolean change = !old.getExpression().equals(expression) || !old.getExpressionType().equals(type);
+ if (old.getBloomFilterData() == null && bloomFilterData != null) {
+ change = true;
+ }
+ if (old.getBloomFilterData() != null && !old.getBloomFilterData().equals(bloomFilterData)) {
+ change = true;
+ }
+
+ // if subscribe data is changed, or consumer is died too long.
+ if (change) {
+ ConsumerFilterData consumerFilterData = build(topic, consumerGroup, expression, type, clientVersion);
+ if (consumerFilterData == null) {
+ // new expression compile error, remove old, let client report error.
+ this.groupFilterData.remove(consumerGroup);
+ return false;
+ }
+ consumerFilterData.setBloomFilterData(bloomFilterData);
+
+ this.groupFilterData.put(consumerGroup, consumerFilterData);
+
+ log.info("Consumer filter info change, old: {}, new: {}, change: {}",
+ old, consumerFilterData, change);
+
+ return true;
+ } else {
+ old.setClientVersion(clientVersion);
+ if (old.isDead()) {
+ reAlive(old);
+ }
+ return true;
+ }
+ }
+ }
+
+ protected void reAlive(ConsumerFilterData filterData) {
+ long oldDeadTime = filterData.getDeadTime();
+ filterData.setDeadTime(0);
+ log.info("Re alive consumer filter: {}, oldDeadTime: {}", filterData, oldDeadTime);
+ }
+
+ public final ConsumerFilterData get(String consumerGroup) {
+ return this.groupFilterData.get(consumerGroup);
+ }
+
+ public final ConcurrentHashMap<String, ConsumerFilterData> getGroupFilterData() {
+ return this.groupFilterData;
+ }
+
+ public void setGroupFilterData(final ConcurrentHashMap<String, ConsumerFilterData> groupFilterData) {
+ this.groupFilterData = groupFilterData;
+ }
+
+ public String getTopic() {
+ return topic;
+ }
+
+ public void setTopic(final String topic) {
+ this.topic = topic;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/broker/src/main/java/org/apache/rocketmq/broker/filter/ExpressionForRetryMessageFilter.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/filter/ExpressionForRetryMessageFilter.java b/broker/src/main/java/org/apache/rocketmq/broker/filter/ExpressionForRetryMessageFilter.java
new file mode 100644
index 0000000..9518178
--- /dev/null
+++ b/broker/src/main/java/org/apache/rocketmq/broker/filter/ExpressionForRetryMessageFilter.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.broker.filter;
+
+
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.filter.ExpressionType;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+
+/**
+ * Support filter to retry topic.
+ * <br>It will decode properties first in order to get real topic.
+ */
+public class ExpressionForRetryMessageFilter extends ExpressionMessageFilter {
+ public ExpressionForRetryMessageFilter(SubscriptionData subscriptionData, ConsumerFilterData consumerFilterData, ConsumerFilterManager consumerFilterManager) {
+ super(subscriptionData, consumerFilterData, consumerFilterManager);
+ }
+
+ @Override
+ public boolean isMatchedByCommitLog(ByteBuffer msgBuffer, Map<String, String> properties) {
+ if (subscriptionData == null) {
+ return true;
+ }
+
+ if (subscriptionData.isClassFilterMode()) {
+ return true;
+ }
+
+ boolean isRetryTopic = subscriptionData.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX);
+
+ if (!isRetryTopic && ExpressionType.isTagType(subscriptionData.getExpressionType())) {
+ return true;
+ }
+
+ ConsumerFilterData realFilterData = this.consumerFilterData;
+ Map<String, String> tempProperties = properties;
+ boolean decoded = false;
+ if (isRetryTopic) {
+ // retry topic, use original filter data.
+ // poor performance to support retry filter.
+ if (tempProperties == null && msgBuffer != null) {
+ decoded = true;
+ tempProperties = MessageDecoder.decodeProperties(msgBuffer);
+ }
+ String realTopic = tempProperties.get(MessageConst.PROPERTY_RETRY_TOPIC);
+ String group = subscriptionData.getTopic().substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length());
+ realFilterData = this.consumerFilterManager.get(realTopic, group);
+ }
+
+ // no expression
+ if (realFilterData == null || realFilterData.getExpression() == null
+ || realFilterData.getCompiledExpression() == null) {
+ return true;
+ }
+
+ if (!decoded && tempProperties == null && msgBuffer != null) {
+ tempProperties = MessageDecoder.decodeProperties(msgBuffer);
+ }
+
+ Object ret = null;
+ try {
+ MessageEvaluationContext context = new MessageEvaluationContext(tempProperties);
+
+ ret = realFilterData.getCompiledExpression().evaluate(context);
+ } catch (Throwable e) {
+ log.error("Message Filter error, " + realFilterData + ", " + tempProperties, e);
+ }
+
+ log.debug("Pull eval result: {}, {}, {}", ret, realFilterData, tempProperties);
+
+ if (ret == null || !(ret instanceof Boolean)) {
+ return false;
+ }
+
+ return (Boolean) ret;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/broker/src/main/java/org/apache/rocketmq/broker/filter/ExpressionMessageFilter.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/filter/ExpressionMessageFilter.java b/broker/src/main/java/org/apache/rocketmq/broker/filter/ExpressionMessageFilter.java
new file mode 100644
index 0000000..893df0d
--- /dev/null
+++ b/broker/src/main/java/org/apache/rocketmq/broker/filter/ExpressionMessageFilter.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.broker.filter;
+
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.filter.ExpressionType;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import org.apache.rocketmq.filter.util.BitsArray;
+import org.apache.rocketmq.filter.util.BloomFilter;
+import org.apache.rocketmq.store.ConsumeQueueExt;
+import org.apache.rocketmq.store.MessageFilter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+
+public class ExpressionMessageFilter implements MessageFilter {
+
+ protected static final Logger log = LoggerFactory.getLogger(LoggerName.FILTER_LOGGER_NAME);
+
+ protected final SubscriptionData subscriptionData;
+ protected final ConsumerFilterData consumerFilterData;
+ protected final ConsumerFilterManager consumerFilterManager;
+ protected final boolean bloomDataValid;
+
+ public ExpressionMessageFilter(SubscriptionData subscriptionData, ConsumerFilterData consumerFilterData,
+ ConsumerFilterManager consumerFilterManager) {
+ this.subscriptionData = subscriptionData;
+ this.consumerFilterData = consumerFilterData;
+ this.consumerFilterManager = consumerFilterManager;
+ if (consumerFilterData == null) {
+ bloomDataValid = false;
+ return;
+ }
+ BloomFilter bloomFilter = this.consumerFilterManager.getBloomFilter();
+ if (bloomFilter != null && bloomFilter.isValid(consumerFilterData.getBloomFilterData())) {
+ bloomDataValid = true;
+ } else {
+ bloomDataValid = false;
+ }
+ }
+
+ @Override
+ public boolean isMatchedByConsumeQueue(Long tagsCode, ConsumeQueueExt.CqExtUnit cqExtUnit) {
+ if (null == subscriptionData) {
+ return true;
+ }
+
+ if (subscriptionData.isClassFilterMode()) {
+ return true;
+ }
+
+ // by tags code.
+ if (ExpressionType.isTagType(subscriptionData.getExpressionType())) {
+
+ if (tagsCode == null || tagsCode < 0L) {
+ return true;
+ }
+
+ if (subscriptionData.getSubString().equals(SubscriptionData.SUB_ALL)) {
+ return true;
+ }
+
+ return subscriptionData.getCodeSet().contains(tagsCode.intValue());
+ } else {
+ // no expression or no bloom
+ if (consumerFilterData == null || consumerFilterData.getExpression() == null
+ || consumerFilterData.getCompiledExpression() == null || consumerFilterData.getBloomFilterData() == null) {
+ return true;
+ }
+
+ // message is before consumer
+ if (cqExtUnit == null || !consumerFilterData.isMsgInLive(cqExtUnit.getMsgStoreTime())) {
+ log.debug("Pull matched because not in live: {}, {}", consumerFilterData, cqExtUnit);
+ return true;
+ }
+
+ byte[] filterBitMap = cqExtUnit.getFilterBitMap();
+ BloomFilter bloomFilter = this.consumerFilterManager.getBloomFilter();
+ if (filterBitMap == null || !this.bloomDataValid
+ || filterBitMap.length * Byte.SIZE != consumerFilterData.getBloomFilterData().getBitNum()) {
+ return true;
+ }
+
+ BitsArray bitsArray = null;
+ try {
+ bitsArray = BitsArray.create(filterBitMap);
+ boolean ret = bloomFilter.isHit(consumerFilterData.getBloomFilterData(), bitsArray);
+ log.debug("Pull {} by bit map:{}, {}, {}", ret, consumerFilterData, bitsArray, cqExtUnit);
+ return ret;
+ } catch (Throwable e) {
+ log.error("bloom filter error, sub=" + subscriptionData
+ + ", filter=" + consumerFilterData + ", bitMap=" + bitsArray, e);
+ }
+ }
+
+ return true;
+ }
+
+ @Override
+ public boolean isMatchedByCommitLog(ByteBuffer msgBuffer, Map<String, String> properties) {
+ if (subscriptionData == null) {
+ return true;
+ }
+
+ if (subscriptionData.isClassFilterMode()) {
+ return true;
+ }
+
+ if (ExpressionType.isTagType(subscriptionData.getExpressionType())) {
+ return true;
+ }
+
+ ConsumerFilterData realFilterData = this.consumerFilterData;
+ Map<String, String> tempProperties = properties;
+
+ // no expression
+ if (realFilterData == null || realFilterData.getExpression() == null
+ || realFilterData.getCompiledExpression() == null) {
+ return true;
+ }
+
+ if (tempProperties == null && msgBuffer != null) {
+ tempProperties = MessageDecoder.decodeProperties(msgBuffer);
+ }
+
+ Object ret = null;
+ try {
+ MessageEvaluationContext context = new MessageEvaluationContext(tempProperties);
+
+ ret = realFilterData.getCompiledExpression().evaluate(context);
+ } catch (Throwable e) {
+ log.error("Message Filter error, " + realFilterData + ", " + tempProperties, e);
+ }
+
+ log.debug("Pull eval result: {}, {}, {}", ret, realFilterData, tempProperties);
+
+ if (ret == null || !(ret instanceof Boolean)) {
+ return false;
+ }
+
+ return (Boolean) ret;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/broker/src/main/java/org/apache/rocketmq/broker/filter/MessageEvaluationContext.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/filter/MessageEvaluationContext.java b/broker/src/main/java/org/apache/rocketmq/broker/filter/MessageEvaluationContext.java
new file mode 100644
index 0000000..879d179
--- /dev/null
+++ b/broker/src/main/java/org/apache/rocketmq/broker/filter/MessageEvaluationContext.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.broker.filter;
+
+import org.apache.rocketmq.filter.expression.EvaluationContext;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Evaluation context from message.
+ */
+public class MessageEvaluationContext implements EvaluationContext {
+
+ private Map<String, String> properties;
+
+ public MessageEvaluationContext(Map<String, String> properties) {
+ this.properties = properties;
+ }
+
+ @Override
+ public Object get(final String name) {
+ if (this.properties == null) {
+ return null;
+ }
+ return this.properties.get(name);
+ }
+
+ @Override
+ public Map<String, Object> keyValues() {
+ if (properties == null) {
+ return null;
+ }
+
+ Map<String, Object> copy = new HashMap<String, Object>(properties.size(), 1);
+
+ for (String key : properties.keySet()) {
+ copy.put(key, properties.get(key));
+ }
+
+ return copy;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/broker/src/main/java/org/apache/rocketmq/broker/longpolling/NotifyMessageArrivingListener.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/NotifyMessageArrivingListener.java b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/NotifyMessageArrivingListener.java
index 2dec9f7..fd38c4f 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/NotifyMessageArrivingListener.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/NotifyMessageArrivingListener.java
@@ -19,6 +19,8 @@ package org.apache.rocketmq.broker.longpolling;
import org.apache.rocketmq.store.MessageArrivingListener;
+import java.util.Map;
+
public class NotifyMessageArrivingListener implements MessageArrivingListener {
private final PullRequestHoldService pullRequestHoldService;
@@ -27,7 +29,9 @@ public class NotifyMessageArrivingListener implements MessageArrivingListener {
}
@Override
- public void arriving(String topic, int queueId, long logicOffset, long tagsCode) {
- this.pullRequestHoldService.notifyMessageArriving(topic, queueId, logicOffset, tagsCode);
+ public void arriving(String topic, int queueId, long logicOffset, long tagsCode,
+ long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {
+ this.pullRequestHoldService.notifyMessageArriving(topic, queueId, logicOffset, tagsCode,
+ msgStoreTime, filterBitMap, properties);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequest.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequest.java b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequest.java
index b66344f..045ab9b 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequest.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequest.java
@@ -19,6 +19,7 @@ package org.apache.rocketmq.broker.longpolling;
import io.netty.channel.Channel;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.store.MessageFilter;
public class PullRequest {
private final RemotingCommand requestCommand;
@@ -27,15 +28,18 @@ public class PullRequest {
private final long suspendTimestamp;
private final long pullFromThisOffset;
private final SubscriptionData subscriptionData;
+ private final MessageFilter messageFilter;
public PullRequest(RemotingCommand requestCommand, Channel clientChannel, long timeoutMillis, long suspendTimestamp,
- long pullFromThisOffset, SubscriptionData subscriptionData) {
+ long pullFromThisOffset, SubscriptionData subscriptionData,
+ MessageFilter messageFilter) {
this.requestCommand = requestCommand;
this.clientChannel = clientChannel;
this.timeoutMillis = timeoutMillis;
this.suspendTimestamp = suspendTimestamp;
this.pullFromThisOffset = pullFromThisOffset;
this.subscriptionData = subscriptionData;
+ this.messageFilter = messageFilter;
}
public RemotingCommand getRequestCommand() {
@@ -61,4 +65,8 @@ public class PullRequest {
public SubscriptionData getSubscriptionData() {
return subscriptionData;
}
+
+ public MessageFilter getMessageFilter() {
+ return messageFilter;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java
index fdba50d..1a53db1 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java
@@ -18,13 +18,13 @@ package org.apache.rocketmq.broker.longpolling;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.ServiceThread;
import org.apache.rocketmq.common.SystemClock;
import org.apache.rocketmq.common.constant.LoggerName;
-import org.apache.rocketmq.store.DefaultMessageFilter;
-import org.apache.rocketmq.store.MessageFilter;
+import org.apache.rocketmq.store.ConsumeQueueExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -33,7 +33,6 @@ public class PullRequestHoldService extends ServiceThread {
private static final String TOPIC_QUEUEID_SEPARATOR = "@";
private final BrokerController brokerController;
private final SystemClock systemClock = new SystemClock();
- private final MessageFilter messageFilter = new DefaultMessageFilter();
private ConcurrentHashMap<String/* topic@queueId */, ManyPullRequest> pullRequestTable =
new ConcurrentHashMap<String, ManyPullRequest>(1024);
@@ -110,10 +109,11 @@ public class PullRequestHoldService extends ServiceThread {
}
public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset) {
- notifyMessageArriving(topic, queueId, maxOffset, null);
+ notifyMessageArriving(topic, queueId, maxOffset, null, 0, null, null);
}
- public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset, final Long tagsCode) {
+ public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset, final Long tagsCode,
+ long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {
String key = this.buildKey(topic, queueId);
ManyPullRequest mpr = this.pullRequestTable.get(key);
if (mpr != null) {
@@ -128,7 +128,14 @@ public class PullRequestHoldService extends ServiceThread {
}
if (newestOffset > request.getPullFromThisOffset()) {
- if (this.messageFilter.isMessageMatched(request.getSubscriptionData(), tagsCode)) {
+ boolean match = request.getMessageFilter().isMatchedByConsumeQueue(tagsCode,
+ new ConsumeQueueExt.CqExtUnit(tagsCode, msgStoreTime, filterBitMap));
+ // match by bit map, need eval again when properties is not null.
+ if (match && properties != null) {
+ match = request.getMessageFilter().isMatchedByCommitLog(null, properties);
+ }
+
+ if (match) {
try {
this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),
request.getRequestCommand());
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
index 039c942..6c2a987 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
@@ -50,7 +50,7 @@ import org.slf4j.LoggerFactory;
public class BrokerOuterAPI {
private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private final RemotingClient remotingClient;
- private final TopAddressing topAddressing = new TopAddressing(MixAll.WS_ADDR);
+ private final TopAddressing topAddressing = new TopAddressing(MixAll.getWSAddr());
private String nameSrvAddr = null;
public BrokerOuterAPI(final NettyClientConfig nettyClientConfig) {
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java b/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java
index 00257fd..8ded973 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java
@@ -18,11 +18,14 @@
package org.apache.rocketmq.broker.plugin;
import java.util.HashMap;
+import java.util.LinkedList;
import java.util.Set;
import org.apache.rocketmq.common.message.MessageExt;
-import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import org.apache.rocketmq.store.CommitLogDispatcher;
+import org.apache.rocketmq.store.ConsumeQueue;
import org.apache.rocketmq.store.GetMessageResult;
import org.apache.rocketmq.store.MessageExtBrokerInner;
+import org.apache.rocketmq.store.MessageFilter;
import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.QueryMessageResult;
@@ -84,8 +87,8 @@ public abstract class AbstractPluginMessageStore implements MessageStore {
@Override
public GetMessageResult getMessage(String group, String topic, int queueId, long offset,
- int maxMsgNums, SubscriptionData subscriptionData) {
- return next.getMessage(group, topic, queueId, offset, maxMsgNums, subscriptionData);
+ int maxMsgNums, final MessageFilter messageFilter) {
+ return next.getMessage(group, topic, queueId, offset, maxMsgNums, messageFilter);
}
@Override
@@ -234,4 +237,13 @@ public abstract class AbstractPluginMessageStore implements MessageStore {
next.setConfirmOffset(phyOffset);
}
+ @Override
+ public LinkedList<CommitLogDispatcher> getDispatcherList() {
+ return next.getDispatcherList();
+ }
+
+ @Override
+ public ConsumeQueue getConsumeQueue(String topic, int queueId) {
+ return next.getConsumeQueue(topic, queueId);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index e35316d..daea53c 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -16,6 +16,7 @@
*/
package org.apache.rocketmq.broker.processor;
+import com.alibaba.fastjson.JSON;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import java.io.UnsupportedEncodingException;
@@ -32,6 +33,8 @@ import java.util.concurrent.ConcurrentHashMap;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.client.ClientChannelInfo;
import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
+import org.apache.rocketmq.broker.filter.ConsumerFilterData;
+import org.apache.rocketmq.broker.filter.ExpressionMessageFilter;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig;
@@ -49,6 +52,7 @@ import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.body.BrokerStatsData;
import org.apache.rocketmq.common.protocol.body.BrokerStatsItem;
import org.apache.rocketmq.common.protocol.body.Connection;
+import org.apache.rocketmq.common.protocol.body.ConsumeQueueData;
import org.apache.rocketmq.common.protocol.body.ConsumeStatsList;
import org.apache.rocketmq.common.protocol.body.ConsumerConnection;
import org.apache.rocketmq.common.protocol.body.GroupList;
@@ -56,6 +60,7 @@ import org.apache.rocketmq.common.protocol.body.KVTable;
import org.apache.rocketmq.common.protocol.body.LockBatchRequestBody;
import org.apache.rocketmq.common.protocol.body.LockBatchResponseBody;
import org.apache.rocketmq.common.protocol.body.ProducerConnection;
+import org.apache.rocketmq.common.protocol.body.QueryConsumeQueueResponseBody;
import org.apache.rocketmq.common.protocol.body.QueryConsumeTimeSpanBody;
import org.apache.rocketmq.common.protocol.body.QueryCorrectionOffsetBody;
import org.apache.rocketmq.common.protocol.body.QueueTimeSpan;
@@ -81,6 +86,7 @@ import org.apache.rocketmq.common.protocol.header.GetMinOffsetRequestHeader;
import org.apache.rocketmq.common.protocol.header.GetMinOffsetResponseHeader;
import org.apache.rocketmq.common.protocol.header.GetProducerConnectionListRequestHeader;
import org.apache.rocketmq.common.protocol.header.GetTopicStatsInfoRequestHeader;
+import org.apache.rocketmq.common.protocol.header.QueryConsumeQueueRequestHeader;
import org.apache.rocketmq.common.protocol.header.QueryConsumeTimeSpanRequestHeader;
import org.apache.rocketmq.common.protocol.header.QueryCorrectionOffsetHeader;
import org.apache.rocketmq.common.protocol.header.QueryTopicConsumeByWhoRequestHeader;
@@ -94,6 +100,7 @@ import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.common.stats.StatsItem;
import org.apache.rocketmq.common.stats.StatsSnapshot;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
+import org.apache.rocketmq.filter.util.BitsArray;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
@@ -101,7 +108,10 @@ import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.protocol.LanguageCode;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+import org.apache.rocketmq.store.ConsumeQueue;
+import org.apache.rocketmq.store.ConsumeQueueExt;
import org.apache.rocketmq.store.DefaultMessageStore;
+import org.apache.rocketmq.store.MessageFilter;
import org.apache.rocketmq.store.SelectMappedBufferResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -187,6 +197,8 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
return ViewBrokerStatsData(ctx, request);
case RequestCode.GET_BROKER_CONSUME_STATS:
return fetchAllConsumeStatsInBroker(ctx, request);
+ case RequestCode.QUERY_CONSUME_QUEUE:
+ return queryConsumeQueue(ctx, request);
default:
break;
}
@@ -1244,4 +1256,83 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
}
}
+ private RemotingCommand queryConsumeQueue(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
+ QueryConsumeQueueRequestHeader requestHeader =
+ (QueryConsumeQueueRequestHeader) request.decodeCommandCustomHeader(QueryConsumeQueueRequestHeader.class);
+
+ RemotingCommand response = RemotingCommand.createResponseCommand(null);
+
+ ConsumeQueue consumeQueue = this.brokerController.getMessageStore().getConsumeQueue(requestHeader.getTopic(),
+ requestHeader.getQueueId());
+ if (consumeQueue == null) {
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark(String.format("%d@%s is not exist!", requestHeader.getQueueId(), requestHeader.getTopic()));
+ return response;
+ }
+
+ QueryConsumeQueueResponseBody body = new QueryConsumeQueueResponseBody();
+ response.setCode(ResponseCode.SUCCESS);
+ response.setBody(body.encode());
+
+ body.setMaxQueueIndex(consumeQueue.getMaxOffsetInQueue());
+ body.setMinQueueIndex(consumeQueue.getMinOffsetInQueue());
+
+ MessageFilter messageFilter = null;
+ if (requestHeader.getConsumerGroup() != null) {
+ SubscriptionData subscriptionData = this.brokerController.getConsumerManager().findSubscriptionData(
+ requestHeader.getConsumerGroup(), requestHeader.getTopic()
+ );
+ body.setSubscriptionData(subscriptionData);
+ if (subscriptionData == null) {
+ body.setFilterData(String.format("%s@%s is not online!", requestHeader.getConsumerGroup(), requestHeader.getTopic()));
+ } else {
+ ConsumerFilterData filterData = this.brokerController.getConsumerFilterManager()
+ .get(requestHeader.getTopic(), requestHeader.getConsumerGroup());
+ body.setFilterData(JSON.toJSONString(filterData, true));
+
+ messageFilter = new ExpressionMessageFilter(subscriptionData, filterData,
+ this.brokerController.getConsumerFilterManager());
+ }
+ }
+
+ SelectMappedBufferResult result = consumeQueue.getIndexBuffer(requestHeader.getIndex());
+ if (result == null) {
+ response.setRemark(String.format("Index %d of %d@%s is not exist!", requestHeader.getIndex(), requestHeader.getQueueId(), requestHeader.getTopic()));
+ return response;
+ }
+ try {
+ List<ConsumeQueueData> queues = new ArrayList<>();
+ for (int i = 0; i < result.getSize() && i < requestHeader.getCount() * ConsumeQueue.CQ_STORE_UNIT_SIZE; i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
+ ConsumeQueueData one = new ConsumeQueueData();
+ one.setPhysicOffset(result.getByteBuffer().getLong());
+ one.setPhysicSize(result.getByteBuffer().getInt());
+ one.setTagsCode(result.getByteBuffer().getLong());
+
+ if (!consumeQueue.isExtAddr(one.getTagsCode())) {
+ queues.add(one);
+ continue;
+ }
+
+ ConsumeQueueExt.CqExtUnit cqExtUnit = consumeQueue.getExt(one.getTagsCode());
+ if (cqExtUnit != null) {
+ one.setExtendDataJson(JSON.toJSONString(cqExtUnit));
+ if (cqExtUnit.getFilterBitMap() != null) {
+ one.setBitMap(BitsArray.create(cqExtUnit.getFilterBitMap()).toString());
+ }
+ if (messageFilter != null) {
+ one.setEval(messageFilter.isMatchedByConsumeQueue(cqExtUnit.getTagsCode(), cqExtUnit));
+ }
+ } else {
+ one.setMsg("Cq extend not exist!addr: " + one.getTagsCode());
+ }
+
+ queues.add(one);
+ }
+ body.setQueueData(queues);
+ } finally {
+ result.release();
+ }
+
+ return response;
+ }
}