You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2017/08/01 07:23:25 UTC

[27/50] [abbrv] incubator-rocketmq git commit: [ROCKETMQ-121]Support message filtering based on SQL92 closes apache/incubator-rocketmq#82

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/broker/src/main/java/org/apache/rocketmq/broker/processor/ClientManageProcessor.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/ClientManageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/ClientManageProcessor.java
index 6349ffc..67807a8 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/ClientManageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/ClientManageProcessor.java
@@ -22,15 +22,19 @@ import org.apache.rocketmq.broker.client.ClientChannelInfo;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.constant.PermName;
+import org.apache.rocketmq.common.filter.ExpressionType;
 import org.apache.rocketmq.common.protocol.RequestCode;
 import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.body.CheckClientRequestBody;
 import org.apache.rocketmq.common.protocol.header.UnregisterClientRequestHeader;
 import org.apache.rocketmq.common.protocol.header.UnregisterClientResponseHeader;
 import org.apache.rocketmq.common.protocol.heartbeat.ConsumerData;
 import org.apache.rocketmq.common.protocol.heartbeat.HeartbeatData;
 import org.apache.rocketmq.common.protocol.heartbeat.ProducerData;
+import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
 import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
 import org.apache.rocketmq.common.sysflag.TopicSysFlag;
+import org.apache.rocketmq.filter.FilterFactory;
 import org.apache.rocketmq.remoting.common.RemotingHelper;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
@@ -54,6 +58,8 @@ public class ClientManageProcessor implements NettyRequestProcessor {
                 return this.heartBeat(ctx, request);
             case RequestCode.UNREGISTER_CLIENT:
                 return this.unregisterClient(ctx, request);
+            case RequestCode.CHECK_CLIENT_CONFIG:
+                return this.checkClientConfig(ctx, request);
             default:
                 break;
         }
@@ -157,4 +163,42 @@ public class ClientManageProcessor implements NettyRequestProcessor {
         response.setRemark(null);
         return response;
     }
+
+    public RemotingCommand checkClientConfig(ChannelHandlerContext ctx, RemotingCommand request)
+        throws RemotingCommandException {
+        final RemotingCommand response = RemotingCommand.createResponseCommand(null);
+
+        CheckClientRequestBody requestBody = CheckClientRequestBody.decode(request.getBody(),
+            CheckClientRequestBody.class);
+
+        if (requestBody != null && requestBody.getSubscriptionData() != null) {
+            SubscriptionData subscriptionData = requestBody.getSubscriptionData();
+
+            if (ExpressionType.isTagType(subscriptionData.getExpressionType())) {
+                response.setCode(ResponseCode.SUCCESS);
+                response.setRemark(null);
+                return response;
+            }
+
+            if (!this.brokerController.getBrokerConfig().isEnablePropertyFilter()) {
+                response.setCode(ResponseCode.SYSTEM_ERROR);
+                response.setRemark("The broker does not support consumer to filter message by " + subscriptionData.getExpressionType());
+                return response;
+            }
+
+            try {
+                FilterFactory.INSTANCE.get(subscriptionData.getExpressionType()).compile(subscriptionData.getSubString());
+            } catch (Exception e) {
+                log.warn("Client {}@{} filter message, but failed to compile expression! sub={}, error={}",
+                    requestBody.getClientId(), requestBody.getGroup(), requestBody.getSubscriptionData(), e.getMessage());
+                response.setCode(ResponseCode.SUBSCRIPTION_PARSE_FAILED);
+                response.setRemark(e.getMessage());
+                return response;
+            }
+        }
+
+        response.setCode(ResponseCode.SUCCESS);
+        response.setRemark(null);
+        return response;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
index 89967d8..10945da 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
@@ -25,6 +25,10 @@ import java.nio.ByteBuffer;
 import java.util.List;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
+import org.apache.rocketmq.broker.filter.ConsumerFilterData;
+import org.apache.rocketmq.broker.filter.ConsumerFilterManager;
+import org.apache.rocketmq.broker.filter.ExpressionForRetryMessageFilter;
+import org.apache.rocketmq.broker.filter.ExpressionMessageFilter;
 import org.apache.rocketmq.broker.longpolling.PullRequest;
 import org.apache.rocketmq.broker.mqtrace.ConsumeMessageContext;
 import org.apache.rocketmq.broker.mqtrace.ConsumeMessageHook;
@@ -34,6 +38,7 @@ import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.common.TopicFilterType;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.constant.PermName;
+import org.apache.rocketmq.common.filter.ExpressionType;
 import org.apache.rocketmq.common.filter.FilterAPI;
 import org.apache.rocketmq.common.help.FAQUrl;
 import org.apache.rocketmq.common.message.MessageDecoder;
@@ -54,6 +59,7 @@ import org.apache.rocketmq.remoting.netty.RequestTask;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.store.GetMessageResult;
 import org.apache.rocketmq.store.MessageExtBrokerInner;
+import org.apache.rocketmq.store.MessageFilter;
 import org.apache.rocketmq.store.PutMessageResult;
 import org.apache.rocketmq.store.config.BrokerRole;
 import org.apache.rocketmq.store.stats.BrokerStatsManager;
@@ -142,13 +148,22 @@ public class PullMessageProcessor implements NettyRequestProcessor {
         }
 
         SubscriptionData subscriptionData = null;
+        ConsumerFilterData consumerFilterData = null;
         if (hasSubscriptionFlag) {
             try {
-                subscriptionData = FilterAPI.buildSubscriptionData(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
-                    requestHeader.getSubscription());
+                subscriptionData = FilterAPI.build(
+                    requestHeader.getTopic(), requestHeader.getSubscription(), requestHeader.getExpressionType()
+                );
+                if (!ExpressionType.isTagType(subscriptionData.getExpressionType())) {
+                    consumerFilterData = ConsumerFilterManager.build(
+                        requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getSubscription(),
+                        requestHeader.getExpressionType(), requestHeader.getSubVersion()
+                    );
+                    assert consumerFilterData != null;
+                }
             } catch (Exception e) {
                 LOG.warn("Parse the consumer's subscription[{}] failed, group: {}", requestHeader.getSubscription(), //
-                        requestHeader.getConsumerGroup());
+                    requestHeader.getConsumerGroup());
                 response.setCode(ResponseCode.SUBSCRIPTION_PARSE_FAILED);
                 response.setRemark("parse the consumer's subscription failed");
                 return response;
@@ -180,16 +195,48 @@ public class PullMessageProcessor implements NettyRequestProcessor {
 
             if (subscriptionData.getSubVersion() < requestHeader.getSubVersion()) {
                 LOG.warn("The broker's subscription is not latest, group: {} {}", requestHeader.getConsumerGroup(),
-                        subscriptionData.getSubString());
+                    subscriptionData.getSubString());
                 response.setCode(ResponseCode.SUBSCRIPTION_NOT_LATEST);
                 response.setRemark("the consumer's subscription not latest");
                 return response;
             }
+            if (!ExpressionType.isTagType(subscriptionData.getExpressionType())) {
+                consumerFilterData = this.brokerController.getConsumerFilterManager().get(requestHeader.getTopic(),
+                    requestHeader.getConsumerGroup());
+                if (consumerFilterData == null) {
+                    response.setCode(ResponseCode.FILTER_DATA_NOT_EXIST);
+                    response.setRemark("The broker's consumer filter data is not exist!Your expression may be wrong!");
+                    return response;
+                }
+                if (consumerFilterData.getClientVersion() < requestHeader.getSubVersion()) {
+                    LOG.warn("The broker's consumer filter data is not latest, group: {}, topic: {}, serverV: {}, clientV: {}",
+                        requestHeader.getConsumerGroup(), requestHeader.getTopic(), consumerFilterData.getClientVersion(), requestHeader.getSubVersion());
+                    response.setCode(ResponseCode.FILTER_DATA_NOT_LATEST);
+                    response.setRemark("the consumer's consumer filter data not latest");
+                    return response;
+                }
+            }
+        }
+
+        if (!ExpressionType.isTagType(subscriptionData.getExpressionType())
+            && !this.brokerController.getBrokerConfig().isEnablePropertyFilter()) {
+            response.setCode(ResponseCode.SYSTEM_ERROR);
+            response.setRemark("The broker does not support consumer to filter message by " + subscriptionData.getExpressionType());
+            return response;
+        }
+
+        MessageFilter messageFilter;
+        if (this.brokerController.getBrokerConfig().isFilterSupportRetry()) {
+            messageFilter = new ExpressionForRetryMessageFilter(subscriptionData, consumerFilterData,
+                this.brokerController.getConsumerFilterManager());
+        } else {
+            messageFilter = new ExpressionMessageFilter(subscriptionData, consumerFilterData,
+                this.brokerController.getConsumerFilterManager());
         }
 
         final GetMessageResult getMessageResult =
             this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
-                requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), subscriptionData);
+                requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), messageFilter);
         if (getMessageResult != null) {
             response.setRemark(getMessageResult.getStatus().name());
             responseHeader.setNextBeginOffset(getMessageResult.getNextBeginOffset());
@@ -368,7 +415,7 @@ public class PullMessageProcessor implements NettyRequestProcessor {
                         long offset = requestHeader.getQueueOffset();
                         int queueId = requestHeader.getQueueId();
                         PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills,
-                            this.brokerController.getMessageStore().now(), offset, subscriptionData);
+                            this.brokerController.getMessageStore().now(), offset, subscriptionData, messageFilter);
                         this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);
                         response = null;
                         break;

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/broker/src/test/java/org/apache/rocketmq/broker/filter/CommitLogDispatcherCalcBitMapTest.java
----------------------------------------------------------------------
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/filter/CommitLogDispatcherCalcBitMapTest.java b/broker/src/test/java/org/apache/rocketmq/broker/filter/CommitLogDispatcherCalcBitMapTest.java
new file mode 100644
index 0000000..87f6256
--- /dev/null
+++ b/broker/src/test/java/org/apache/rocketmq/broker/filter/CommitLogDispatcherCalcBitMapTest.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.broker.filter;
+
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.filter.ExpressionType;
+import org.apache.rocketmq.filter.util.BitsArray;
+import org.apache.rocketmq.store.DispatchRequest;
+import org.junit.Test;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class CommitLogDispatcherCalcBitMapTest {
+
+    @Test
+    public void testDispatch_filterDataIllegal() {
+        BrokerConfig brokerConfig = new BrokerConfig();
+        brokerConfig.setEnableCalcFilterBitMap(true);
+
+        ConsumerFilterManager filterManager = new ConsumerFilterManager();
+
+        filterManager.register("topic0", "CID_0", "a is not null and a >= 5",
+            ExpressionType.SQL92, System.currentTimeMillis());
+
+        filterManager.register("topic0", "CID_1", "a is not null and a >= 15",
+            ExpressionType.SQL92, System.currentTimeMillis());
+
+        ConsumerFilterData nullExpression = filterManager.get("topic0", "CID_0");
+        nullExpression.setExpression(null);
+        nullExpression.setCompiledExpression(null);
+        ConsumerFilterData nullBloomData = filterManager.get("topic0", "CID_1");
+        nullBloomData.setBloomFilterData(null);
+
+
+        CommitLogDispatcherCalcBitMap calcBitMap = new CommitLogDispatcherCalcBitMap(brokerConfig,
+            filterManager);
+
+        for (int i = 0; i < 1; i++) {
+            Map<String, String> properties = new HashMap<String, String>(4);
+            properties.put("a", String.valueOf(i * 10 + 5));
+
+            String topic = "topic" + i;
+
+            DispatchRequest dispatchRequest = new DispatchRequest(
+                topic,
+                0,
+                i * 100 + 123,
+                100,
+                (long) ("tags" + i).hashCode(),
+                System.currentTimeMillis(),
+                i,
+                null,
+                UUID.randomUUID().toString(),
+                0,
+                0,
+                properties
+            );
+
+            calcBitMap.dispatch(dispatchRequest);
+
+            assertThat(dispatchRequest.getBitMap()).isNotNull();
+
+            BitsArray bitsArray = BitsArray.create(dispatchRequest.getBitMap(),
+                filterManager.getBloomFilter().getM());
+
+            for (int j = 0; j < bitsArray.bitLength(); j++) {
+                assertThat(bitsArray.getBit(j)).isFalse();
+            }
+        }
+    }
+
+    @Test
+    public void testDispatch_blankFilterData() {
+        BrokerConfig brokerConfig = new BrokerConfig();
+        brokerConfig.setEnableCalcFilterBitMap(true);
+
+        ConsumerFilterManager filterManager = new ConsumerFilterManager();
+
+        CommitLogDispatcherCalcBitMap calcBitMap = new CommitLogDispatcherCalcBitMap(brokerConfig,
+            filterManager);
+
+        for (int i = 0; i < 10; i++) {
+            Map<String, String> properties = new HashMap<String, String>(4);
+            properties.put("a", String.valueOf(i * 10 + 5));
+
+            String topic = "topic" + i;
+
+            DispatchRequest dispatchRequest = new DispatchRequest(
+                topic,
+                0,
+                i * 100 + 123,
+                100,
+                (long) ("tags" + i).hashCode(),
+                System.currentTimeMillis(),
+                i,
+                null,
+                UUID.randomUUID().toString(),
+                0,
+                0,
+                properties
+            );
+
+            calcBitMap.dispatch(dispatchRequest);
+
+            assertThat(dispatchRequest.getBitMap()).isNull();
+        }
+    }
+
+    @Test
+    public void testDispatch() {
+        BrokerConfig brokerConfig = new BrokerConfig();
+        brokerConfig.setEnableCalcFilterBitMap(true);
+
+        ConsumerFilterManager filterManager = ConsumerFilterManagerTest.gen(10, 10);
+
+        CommitLogDispatcherCalcBitMap calcBitMap = new CommitLogDispatcherCalcBitMap(brokerConfig,
+            filterManager);
+
+        for (int i = 0; i < 10; i++) {
+            Map<String, String> properties = new HashMap<String, String>(4);
+            properties.put("a", String.valueOf(i * 10 + 5));
+
+            String topic = "topic" + i;
+
+            DispatchRequest dispatchRequest = new DispatchRequest(
+                topic,
+                0,
+                i * 100 + 123,
+                100,
+                (long) ("tags" + i).hashCode(),
+                System.currentTimeMillis(),
+                i,
+                null,
+                UUID.randomUUID().toString(),
+                0,
+                0,
+                properties
+            );
+
+            calcBitMap.dispatch(dispatchRequest);
+
+            assertThat(dispatchRequest.getBitMap()).isNotNull();
+
+            BitsArray bits = BitsArray.create(dispatchRequest.getBitMap());
+
+            Collection<ConsumerFilterData> filterDatas = filterManager.get(topic);
+
+            for (ConsumerFilterData filterData : filterDatas) {
+
+                if (filterManager.getBloomFilter().isHit(filterData.getBloomFilterData(), bits)) {
+                    try {
+                        assertThat((Boolean) filterData.getCompiledExpression().evaluate(
+                            new MessageEvaluationContext(properties)
+                        )).isTrue();
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                        assertThat(true).isFalse();
+                    }
+                } else {
+                    try {
+                        assertThat((Boolean) filterData.getCompiledExpression().evaluate(
+                            new MessageEvaluationContext(properties)
+                        )).isFalse();
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                        assertThat(true).isFalse();
+                    }
+                }
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/broker/src/test/java/org/apache/rocketmq/broker/filter/ConsumerFilterManagerTest.java
----------------------------------------------------------------------
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/filter/ConsumerFilterManagerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/filter/ConsumerFilterManagerTest.java
new file mode 100644
index 0000000..c8412a8
--- /dev/null
+++ b/broker/src/test/java/org/apache/rocketmq/broker/filter/ConsumerFilterManagerTest.java
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.broker.filter;
+
+import org.apache.rocketmq.common.filter.ExpressionType;
+import org.apache.rocketmq.common.filter.FilterAPI;
+import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class ConsumerFilterManagerTest {
+
+    public static ConsumerFilterManager gen(int topicCount, int consumerCount) {
+        ConsumerFilterManager filterManager = new ConsumerFilterManager();
+
+        for (int i = 0; i < topicCount; i++) {
+            String topic = "topic" + i;
+
+            for (int j = 0; j < consumerCount; j++) {
+
+                String consumer = "CID_" + j;
+
+                filterManager.register(topic, consumer, expr(j), ExpressionType.SQL92, System.currentTimeMillis());
+            }
+        }
+
+        return filterManager;
+    }
+
+    public static String expr(int i) {
+        return "a is not null and a > " + ((i - 1) * 10) + " and a < " + ((i + 1) * 10);
+    }
+
+    @Test
+    public void testRegister_newExpressionCompileErrorAndRemoveOld() {
+        ConsumerFilterManager filterManager = gen(10, 10);
+
+        assertThat(filterManager.get("topic9", "CID_9")).isNotNull();
+
+        String newExpr = "a between 10,20";
+
+        assertThat(filterManager.register("topic9", "CID_9", newExpr, ExpressionType.SQL92, System.currentTimeMillis() + 1))
+            .isFalse();
+        assertThat(filterManager.get("topic9", "CID_9")).isNull();
+
+        newExpr = "a between 10 AND 20";
+
+        assertThat(filterManager.register("topic9", "CID_9", newExpr, ExpressionType.SQL92, System.currentTimeMillis() + 1))
+            .isTrue();
+
+        ConsumerFilterData filterData = filterManager.get("topic9", "CID_9");
+
+        assertThat(filterData).isNotNull();
+        assertThat(newExpr).isEqualTo(filterData.getExpression());
+    }
+
+    @Test
+    public void testRegister_change() {
+        ConsumerFilterManager filterManager = gen(10, 10);
+
+        ConsumerFilterData filterData = filterManager.get("topic9", "CID_9");
+
+        System.out.println(filterData.getCompiledExpression());
+
+        String newExpr = "a > 0 and a < 10";
+
+        filterManager.register("topic9", "CID_9", newExpr, ExpressionType.SQL92, System.currentTimeMillis() + 1);
+
+        filterData = filterManager.get("topic9", "CID_9");
+
+        assertThat(newExpr).isEqualTo(filterData.getExpression());
+
+        System.out.println(filterData.toString());
+    }
+
+    @Test
+    public void testRegister() {
+        ConsumerFilterManager filterManager = gen(10, 10);
+
+        ConsumerFilterData filterData = filterManager.get("topic9", "CID_9");
+
+        assertThat(filterData).isNotNull();
+        assertThat(filterData.isDead()).isFalse();
+
+        // new version
+        assertThat(filterManager.register(
+            "topic9", "CID_9", "a is not null", ExpressionType.SQL92, System.currentTimeMillis() + 1000
+        )).isTrue();
+
+        ConsumerFilterData newFilter = filterManager.get("topic9", "CID_9");
+
+        assertThat(newFilter).isNotEqualTo(filterData);
+
+        // same version
+        assertThat(filterManager.register(
+            "topic9", "CID_9", "a is null", ExpressionType.SQL92, newFilter.getClientVersion()
+        )).isFalse();
+
+        ConsumerFilterData filterData1 = filterManager.get("topic9", "CID_9");
+
+        assertThat(newFilter).isEqualTo(filterData1);
+    }
+
+    @Test
+    public void testRegister_reAlive() {
+        ConsumerFilterManager filterManager = gen(10, 10);
+
+        ConsumerFilterData filterData = filterManager.get("topic9", "CID_9");
+
+        assertThat(filterData).isNotNull();
+        assertThat(filterData.isDead()).isFalse();
+
+        //make dead
+        filterManager.unRegister("CID_9");
+
+        //reAlive
+        filterManager.register(
+            filterData.getTopic(),
+            filterData.getConsumerGroup(),
+            filterData.getExpression(),
+            filterData.getExpressionType(),
+            System.currentTimeMillis()
+        );
+
+        ConsumerFilterData newFilterData = filterManager.get("topic9", "CID_9");
+
+        assertThat(newFilterData).isNotNull();
+        assertThat(newFilterData.isDead()).isFalse();
+    }
+
+    @Test
+    public void testRegister_bySubscriptionData() {
+        ConsumerFilterManager filterManager = new ConsumerFilterManager();
+        List<SubscriptionData> subscriptionDatas = new ArrayList<>();
+        for (int i = 0; i < 10; i++) {
+            try {
+                subscriptionDatas.add(
+                    FilterAPI.build(
+                        "topic" + i,
+                        "a is not null and a > " + i,
+                        ExpressionType.SQL92
+                    )
+                );
+            } catch (Exception e) {
+                e.printStackTrace();
+                assertThat(true).isFalse();
+            }
+        }
+
+        filterManager.register("CID_0", subscriptionDatas);
+
+        Collection<ConsumerFilterData> filterDatas = filterManager.getByGroup("CID_0");
+
+        assertThat(filterDatas).isNotNull();
+        assertThat(filterDatas.size()).isEqualTo(10);
+
+        Iterator<ConsumerFilterData> iterator = filterDatas.iterator();
+        while (iterator.hasNext()) {
+            ConsumerFilterData filterData = iterator.next();
+
+            assertThat(filterData).isNotNull();
+            assertThat(filterManager.getBloomFilter().isValid(filterData.getBloomFilterData())).isTrue();
+        }
+    }
+
+    @Test
+    public void testRegister_tag() {
+        ConsumerFilterManager filterManager = new ConsumerFilterManager();
+
+        assertThat(filterManager.register("topic0", "CID_0", "*", null, System.currentTimeMillis())).isFalse();
+
+        Collection<ConsumerFilterData> filterDatas = filterManager.getByGroup("CID_0");
+
+        assertThat(filterDatas).isNullOrEmpty();
+    }
+
+    @Test
+    public void testUnregister() {
+        ConsumerFilterManager filterManager = gen(10, 10);
+
+        ConsumerFilterData filterData = filterManager.get("topic9", "CID_9");
+
+        assertThat(filterData).isNotNull();
+        assertThat(filterData.isDead()).isFalse();
+
+        filterManager.unRegister("CID_9");
+
+        assertThat(filterData.isDead()).isTrue();
+    }
+
+    @Test
+    public void testPersist() {
+        ConsumerFilterManager filterManager = gen(10, 10);
+
+        try {
+            filterManager.persist();
+
+            ConsumerFilterData filterData = filterManager.get("topic9", "CID_9");
+
+            assertThat(filterData).isNotNull();
+            assertThat(filterData.isDead()).isFalse();
+
+            ConsumerFilterManager loadFilter = new ConsumerFilterManager();
+
+            assertThat(loadFilter.load()).isTrue();
+
+            filterData = loadFilter.get("topic9", "CID_9");
+
+            assertThat(filterData).isNotNull();
+            assertThat(filterData.isDead()).isTrue();
+            assertThat(filterData.getCompiledExpression()).isNotNull();
+        } finally {
+            deleteDirectory("./unit_test");
+        }
+    }
+
+    @Test
+    public void testPersist_clean() {
+        ConsumerFilterManager filterManager = gen(10, 10);
+
+        String topic = "topic9";
+        for (int i = 0; i < 10; i++) {
+            String cid = "CID_" + i;
+
+            ConsumerFilterData filterData = filterManager.get(topic, cid);
+
+            assertThat(filterData).isNotNull();
+            assertThat(filterData.isDead()).isFalse();
+
+            //make dead more than 24h
+            filterData.setBornTime(System.currentTimeMillis() - 26 * 60 * 60 * 1000);
+            filterData.setDeadTime(System.currentTimeMillis() - 25 * 60 * 60 * 1000);
+        }
+
+        try {
+            filterManager.persist();
+
+            ConsumerFilterManager loadFilter = new ConsumerFilterManager();
+
+            assertThat(loadFilter.load()).isTrue();
+
+            ConsumerFilterData filterData = loadFilter.get(topic, "CID_9");
+
+            assertThat(filterData).isNull();
+
+            Collection<ConsumerFilterData> topicData = loadFilter.get(topic);
+
+            assertThat(topicData).isNullOrEmpty();
+        } finally {
+            deleteDirectory("./unit_test");
+        }
+    }
+
+    protected void deleteDirectory(String rootPath) {
+        File file = new File(rootPath);
+        deleteFile(file);
+    }
+
+    protected void deleteFile(File file) {
+        File[] subFiles = file.listFiles();
+        if (subFiles != null) {
+            for (File sub : subFiles) {
+                deleteFile(sub);
+            }
+        }
+
+        file.delete();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/broker/src/test/java/org/apache/rocketmq/broker/filter/MessageStoreWithFilterTest.java
----------------------------------------------------------------------
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/filter/MessageStoreWithFilterTest.java b/broker/src/test/java/org/apache/rocketmq/broker/filter/MessageStoreWithFilterTest.java
new file mode 100644
index 0000000..53e563e
--- /dev/null
+++ b/broker/src/test/java/org/apache/rocketmq/broker/filter/MessageStoreWithFilterTest.java
@@ -0,0 +1,392 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.broker.filter;
+
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.filter.ExpressionType;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import org.apache.rocketmq.store.CommitLogDispatcher;
+import org.apache.rocketmq.store.DefaultMessageStore;
+import org.apache.rocketmq.store.DispatchRequest;
+import org.apache.rocketmq.store.GetMessageResult;
+import org.apache.rocketmq.store.GetMessageStatus;
+import org.apache.rocketmq.store.MessageArrivingListener;
+import org.apache.rocketmq.store.MessageExtBrokerInner;
+import org.apache.rocketmq.store.PutMessageResult;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.apache.rocketmq.store.stats.BrokerStatsManager;
+import org.junit.Test;
+
+import java.io.File;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class MessageStoreWithFilterTest {
+
+    private static final String msg = "Once, there was a chance for me!";
+    private static final byte[] msgBody = msg.getBytes();
+
+    private static final String topic = "topic";
+    private static final int queueId = 0;
+    private static final String storePath = "." + File.separator + "unit_test_store";
+    private static final int commitLogFileSize = 1024 * 1024 * 256;
+    private static final int cqFileSize = 300000 * 20;
+    private static final int cqExtFileSize = 300000 * 128;
+
+    private static SocketAddress BornHost;
+
+    private static SocketAddress StoreHost;
+
+    static {
+        try {
+            StoreHost = new InetSocketAddress(InetAddress.getLocalHost(), 8123);
+        } catch (UnknownHostException e) {
+            e.printStackTrace();
+        }
+        try {
+            BornHost = new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 0);
+        } catch (UnknownHostException e) {
+            e.printStackTrace();
+        }
+    }
+
+    public MessageExtBrokerInner buildMessage() {
+        MessageExtBrokerInner msg = new MessageExtBrokerInner();
+        msg.setTopic(topic);
+        msg.setTags("TAG1");
+        msg.setKeys("Hello");
+        msg.setBody(msgBody);
+        msg.setKeys(String.valueOf(System.currentTimeMillis()));
+        msg.setQueueId(queueId);
+        msg.setSysFlag(0);
+        msg.setBornTimestamp(System.currentTimeMillis());
+        msg.setStoreHost(StoreHost);
+        msg.setBornHost(BornHost);
+        for (int i = 1; i < 3; i++) {
+            msg.putUserProperty(String.valueOf(i), "imagoodperson" + i);
+        }
+        msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
+
+        return msg;
+    }
+
+    public MessageStoreConfig buildStoreConfig(int commitLogFileSize, int cqFileSize,
+                                               boolean enableCqExt, int cqExtFileSize) {
+        MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
+        messageStoreConfig.setMapedFileSizeCommitLog(commitLogFileSize);
+        messageStoreConfig.setMapedFileSizeConsumeQueue(cqFileSize);
+        messageStoreConfig.setMappedFileSizeConsumeQueueExt(cqExtFileSize);
+        messageStoreConfig.setMessageIndexEnable(false);
+        messageStoreConfig.setEnableConsumeQueueExt(enableCqExt);
+
+        messageStoreConfig.setStorePathRootDir(storePath);
+        messageStoreConfig.setStorePathCommitLog(storePath + File.separator + "commitlog");
+
+        return messageStoreConfig;
+    }
+
+    protected DefaultMessageStore gen(ConsumerFilterManager filterManager) throws Exception {
+        MessageStoreConfig messageStoreConfig = buildStoreConfig(
+            commitLogFileSize, cqFileSize, true, cqExtFileSize
+        );
+
+        BrokerConfig brokerConfig = new BrokerConfig();
+        brokerConfig.setEnableCalcFilterBitMap(true);
+        brokerConfig.setMaxErrorRateOfBloomFilter(20);
+        brokerConfig.setExpectConsumerNumUseFilter(64);
+
+        DefaultMessageStore master = new DefaultMessageStore(
+            messageStoreConfig,
+            new BrokerStatsManager(brokerConfig.getBrokerClusterName()),
+            new MessageArrivingListener() {
+                @Override
+                public void arriving(String topic, int queueId, long logicOffset, long tagsCode,
+                                     long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {
+//                    System.out.println(String.format("Msg coming: %s, %d, %d, %d",
+//                        topic, queueId, logicOffset, tagsCode));
+                }
+            }
+            , brokerConfig);
+
+        master.getDispatcherList().addFirst(new CommitLogDispatcher() {
+            @Override
+            public void dispatch(DispatchRequest request) {
+                try {
+//                    System.out.println(String.format("offset:%d, bitMap:%s", request.getCommitLogOffset(),
+//                        BitsArray.create(request.getBitMap()).toString()));
+                } catch (Throwable e) {
+                    e.printStackTrace();
+                }
+            }
+        });
+        master.getDispatcherList().addFirst(new CommitLogDispatcherCalcBitMap(brokerConfig, filterManager));
+
+        assertThat(master.load()).isTrue();
+
+        master.start();
+
+        return master;
+    }
+
+    protected List<MessageExtBrokerInner> putMsg(DefaultMessageStore master, int topicCount, int msgCountPerTopic) throws Exception {
+        List<MessageExtBrokerInner> msgs = new ArrayList<MessageExtBrokerInner>();
+        for (int i = 0; i < topicCount; i++) {
+            String realTopic = topic + i;
+            for (int j = 0; j < msgCountPerTopic; j++) {
+                MessageExtBrokerInner msg = buildMessage();
+                msg.setTopic(realTopic);
+                msg.putUserProperty("a", String.valueOf(j * 10 + 5));
+                msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
+
+                PutMessageResult result = master.putMessage(msg);
+
+                msg.setMsgId(result.getAppendMessageResult().getMsgId());
+
+                msgs.add(msg);
+            }
+        }
+
+        return msgs;
+    }
+
+    protected void deleteDirectory(String rootPath) {
+        File file = new File(rootPath);
+        deleteFile(file);
+    }
+
+    protected void deleteFile(File file) {
+        File[] subFiles = file.listFiles();
+        if (subFiles != null) {
+            for (File sub : subFiles) {
+                deleteFile(sub);
+            }
+        }
+
+        file.delete();
+    }
+
+    protected List<MessageExtBrokerInner> filtered(List<MessageExtBrokerInner> msgs, ConsumerFilterData filterData) {
+        List<MessageExtBrokerInner> filteredMsgs = new ArrayList<MessageExtBrokerInner>();
+
+        for (MessageExtBrokerInner messageExtBrokerInner : msgs) {
+
+            if (!messageExtBrokerInner.getTopic().equals(filterData.getTopic())) {
+                continue;
+            }
+
+            try {
+                Object evlRet = filterData.getCompiledExpression().evaluate(new MessageEvaluationContext(messageExtBrokerInner.getProperties()));
+
+                if (evlRet == null || !(evlRet instanceof Boolean) || (Boolean) evlRet) {
+                    filteredMsgs.add(messageExtBrokerInner);
+                }
+            } catch (Exception e) {
+                e.printStackTrace();
+                assertThat(true).isFalse();
+            }
+        }
+
+        return filteredMsgs;
+    }
+
+    @Test
+    public void testGetMessage_withFilterBitMapAndConsumerChanged() {
+        int topicCount = 10, msgPerTopic = 10;
+        ConsumerFilterManager filterManager = ConsumerFilterManagerTest.gen(topicCount, msgPerTopic);
+
+        DefaultMessageStore master = null;
+        try {
+            master = gen(filterManager);
+        } catch (Exception e) {
+            e.printStackTrace();
+            assertThat(true).isFalse();
+        }
+
+        try {
+            List<MessageExtBrokerInner> msgs = null;
+            try {
+                msgs = putMsg(master, topicCount, msgPerTopic);
+            } catch (Exception e) {
+                e.printStackTrace();
+                assertThat(true).isFalse();
+            }
+
+            // sleep to wait for consume queue has been constructed.
+            try {
+                Thread.sleep(1000);
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+                assertThat(true).isFalse();
+            }
+
+            // reset consumer;
+            String topic = "topic" + 0;
+            String resetGroup = "CID_" + 2;
+            String normalGroup = "CID_" + 3;
+
+            {
+                // reset CID_2@topic0 to get all messages.
+                SubscriptionData resetSubData = new SubscriptionData();
+                resetSubData.setExpressionType(ExpressionType.SQL92);
+                resetSubData.setTopic(topic);
+                resetSubData.setClassFilterMode(false);
+                resetSubData.setSubString("a is not null OR a is null");
+
+                ConsumerFilterData resetFilterData = ConsumerFilterManager.build(topic,
+                    resetGroup, resetSubData.getSubString(), resetSubData.getExpressionType(),
+                    System.currentTimeMillis());
+
+                GetMessageResult resetGetResult = master.getMessage(resetGroup, topic, queueId, 0, 1000,
+                    new ExpressionMessageFilter(resetSubData, resetFilterData, filterManager));
+
+                try {
+                    assertThat(resetGetResult).isNotNull();
+
+                    List<MessageExtBrokerInner> filteredMsgs = filtered(msgs, resetFilterData);
+
+                    assertThat(resetGetResult.getMessageBufferList().size()).isEqualTo(filteredMsgs.size());
+                } finally {
+                    resetGetResult.release();
+                }
+            }
+
+            {
+                ConsumerFilterData normalFilterData = filterManager.get(topic, normalGroup);
+                assertThat(normalFilterData).isNotNull();
+                assertThat(normalFilterData.getBornTime()).isLessThan(System.currentTimeMillis());
+
+                SubscriptionData normalSubData = new SubscriptionData();
+                normalSubData.setExpressionType(normalFilterData.getExpressionType());
+                normalSubData.setTopic(topic);
+                normalSubData.setClassFilterMode(false);
+                normalSubData.setSubString(normalFilterData.getExpression());
+
+                List<MessageExtBrokerInner> filteredMsgs = filtered(msgs, normalFilterData);
+
+                GetMessageResult normalGetResult = master.getMessage(normalGroup, topic, queueId, 0, 1000,
+                    new ExpressionMessageFilter(normalSubData, normalFilterData, filterManager));
+
+                try {
+                    assertThat(normalGetResult).isNotNull();
+                    assertThat(normalGetResult.getMessageBufferList().size()).isEqualTo(filteredMsgs.size());
+                } finally {
+                    normalGetResult.release();
+                }
+            }
+        } finally {
+            master.shutdown();
+            master.destroy();
+            deleteDirectory(storePath);
+        }
+    }
+
+    @Test
+    public void testGetMessage_withFilterBitMap() {
+        int topicCount = 10, msgPerTopic = 500;
+        ConsumerFilterManager filterManager = ConsumerFilterManagerTest.gen(topicCount, msgPerTopic);
+
+        DefaultMessageStore master = null;
+        try {
+            master = gen(filterManager);
+        } catch (Exception e) {
+            e.printStackTrace();
+            assertThat(true).isFalse();
+        }
+
+        try {
+            List<MessageExtBrokerInner> msgs = null;
+            try {
+                msgs = putMsg(master, topicCount, msgPerTopic);
+                // sleep to wait for consume queue has been constructed.
+                Thread.sleep(1000);
+            } catch (Exception e) {
+                e.printStackTrace();
+                assertThat(true).isFalse();
+            }
+
+            for (int i = 0; i < topicCount; i++) {
+                String realTopic = topic + i;
+
+                for (int j = 0; j < msgPerTopic; j++) {
+                    String group = "CID_" + j;
+
+                    ConsumerFilterData filterData = filterManager.get(realTopic, group);
+                    assertThat(filterData).isNotNull();
+
+                    List<MessageExtBrokerInner> filteredMsgs = filtered(msgs, filterData);
+
+                    SubscriptionData subscriptionData = new SubscriptionData();
+                    subscriptionData.setExpressionType(filterData.getExpressionType());
+                    subscriptionData.setTopic(filterData.getTopic());
+                    subscriptionData.setClassFilterMode(false);
+                    subscriptionData.setSubString(filterData.getExpression());
+
+                    GetMessageResult getMessageResult = master.getMessage(group, realTopic, queueId, 0, 10000,
+                        new ExpressionMessageFilter(subscriptionData, filterData, filterManager));
+                    String assertMsg = group + "-" + realTopic;
+                    try {
+                        assertThat(getMessageResult).isNotNull();
+                        assertThat(GetMessageStatus.FOUND).isEqualTo(getMessageResult.getStatus());
+                        assertThat(getMessageResult.getMessageBufferList()).isNotNull().isNotEmpty();
+                        assertThat(getMessageResult.getMessageBufferList().size()).isEqualTo(filteredMsgs.size());
+
+                        for (ByteBuffer buffer : getMessageResult.getMessageBufferList()) {
+                            MessageExt messageExt = MessageDecoder.decode(buffer.slice(), false);
+                            assertThat(messageExt).isNotNull();
+
+                            Object evlRet = null;
+                            try {
+                                evlRet = filterData.getCompiledExpression().evaluate(new MessageEvaluationContext(messageExt.getProperties()));
+                            } catch (Exception e) {
+                                e.printStackTrace();
+                                assertThat(true).isFalse();
+                            }
+
+                            assertThat(evlRet).isNotNull().isEqualTo(Boolean.TRUE);
+
+                            // check
+                            boolean find = false;
+                            for (MessageExtBrokerInner messageExtBrokerInner : filteredMsgs) {
+                                if (messageExtBrokerInner.getMsgId().equals(messageExt.getMsgId())) {
+                                    find = true;
+                                }
+                            }
+                            assertThat(find).isTrue();
+                        }
+                    } finally {
+                        getMessageResult.release();
+                    }
+                }
+            }
+        } finally {
+            master.shutdown();
+            master.destroy();
+            deleteDirectory(storePath);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/broker/src/test/java/org/apache/rocketmq/broker/processor/PullMessageProcessorTest.java
----------------------------------------------------------------------
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/PullMessageProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/PullMessageProcessorTest.java
index d3d9812..941d4a7 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/processor/PullMessageProcessorTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/PullMessageProcessorTest.java
@@ -25,6 +25,7 @@ import java.util.List;
 import java.util.Set;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.client.ClientChannelInfo;
+import org.apache.rocketmq.broker.filter.ExpressionMessageFilter;
 import org.apache.rocketmq.broker.mqtrace.ConsumeMessageContext;
 import org.apache.rocketmq.broker.mqtrace.ConsumeMessageHook;
 import org.apache.rocketmq.common.BrokerConfig;
@@ -126,7 +127,7 @@ public class PullMessageProcessorTest {
     @Test
     public void testProcessRequest_Found() throws RemotingCommandException {
         GetMessageResult getMessageResult = createGetMessageResult();
-        when(messageStore.getMessage(anyString(), anyString(), anyInt(), anyLong(), anyInt(), any(SubscriptionData.class))).thenReturn(getMessageResult);
+        when(messageStore.getMessage(anyString(), anyString(), anyInt(), anyLong(), anyInt(), any(ExpressionMessageFilter.class))).thenReturn(getMessageResult);
 
         final RemotingCommand request = createPullMsgCommand(RequestCode.PULL_MESSAGE);
         RemotingCommand response = pullMessageProcessor.processRequest(handlerContext, request);
@@ -137,7 +138,7 @@ public class PullMessageProcessorTest {
     @Test
     public void testProcessRequest_FoundWithHook() throws RemotingCommandException {
         GetMessageResult getMessageResult = createGetMessageResult();
-        when(messageStore.getMessage(anyString(), anyString(), anyInt(), anyLong(), anyInt(), any(SubscriptionData.class))).thenReturn(getMessageResult);
+        when(messageStore.getMessage(anyString(), anyString(), anyInt(), anyLong(), anyInt(), any(ExpressionMessageFilter.class))).thenReturn(getMessageResult);
         List<ConsumeMessageHook> consumeMessageHookList = new ArrayList<>();
         final ConsumeMessageContext[] messageContext = new ConsumeMessageContext[1];
         ConsumeMessageHook consumeMessageHook = new ConsumeMessageHook() {
@@ -168,7 +169,7 @@ public class PullMessageProcessorTest {
     public void testProcessRequest_MsgWasRemoving() throws RemotingCommandException {
         GetMessageResult getMessageResult = createGetMessageResult();
         getMessageResult.setStatus(GetMessageStatus.MESSAGE_WAS_REMOVING);
-        when(messageStore.getMessage(anyString(), anyString(), anyInt(), anyLong(), anyInt(), any(SubscriptionData.class))).thenReturn(getMessageResult);
+        when(messageStore.getMessage(anyString(), anyString(), anyInt(), anyLong(), anyInt(), any(ExpressionMessageFilter.class))).thenReturn(getMessageResult);
 
         final RemotingCommand request = createPullMsgCommand(RequestCode.PULL_MESSAGE);
         RemotingCommand response = pullMessageProcessor.processRequest(handlerContext, request);
@@ -180,7 +181,7 @@ public class PullMessageProcessorTest {
     public void testProcessRequest_NoMsgInQueue() throws RemotingCommandException {
         GetMessageResult getMessageResult = createGetMessageResult();
         getMessageResult.setStatus(GetMessageStatus.NO_MESSAGE_IN_QUEUE);
-        when(messageStore.getMessage(anyString(), anyString(), anyInt(), anyLong(), anyInt(), any(SubscriptionData.class))).thenReturn(getMessageResult);
+        when(messageStore.getMessage(anyString(), anyString(), anyInt(), anyLong(), anyInt(), any(ExpressionMessageFilter.class))).thenReturn(getMessageResult);
 
         final RemotingCommand request = createPullMsgCommand(RequestCode.PULL_MESSAGE);
         RemotingCommand response = pullMessageProcessor.processRequest(handlerContext, request);

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
index 3903fe2..9c9b59e 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
@@ -519,6 +519,21 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
     }
 
     /**
+     * Subscribe a topic by message selector.
+     *
+     * @see org.apache.rocketmq.client.consumer.MessageSelector#bySql
+     * @see org.apache.rocketmq.client.consumer.MessageSelector#byTag
+     *
+     * @param topic topic to consume.
+     * @param messageSelector {@link org.apache.rocketmq.client.consumer.MessageSelector}
+     * @throws MQClientException
+     */
+    @Override
+    public void subscribe(final String topic, final MessageSelector messageSelector) throws MQClientException {
+        this.defaultMQPushConsumerImpl.subscribe(topic, messageSelector);
+    }
+
+    /**
      * Un-subscribe the specified topic from subscription.
      * @param topic message topic
      */

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/client/src/main/java/org/apache/rocketmq/client/consumer/MQPushConsumer.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/MQPushConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/MQPushConsumer.java
index 9255281..9c6c1f1 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/MQPushConsumer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/MQPushConsumer.java
@@ -70,6 +70,27 @@ public interface MQPushConsumer extends MQConsumer {
     void subscribe(final String topic, final String fullClassName, final String filterClassSource) throws MQClientException;
 
     /**
+     * Subscribe some topic with selector.
+     * <p>
+     * This interface also has the ability of {@link #subscribe(String, String)},
+     * and, support other message selection, such as {@link org.apache.rocketmq.common.filter.ExpressionType#SQL92}.
+     * </p>
+     * <p/>
+     * <p>
+     * Choose Tag: {@link MessageSelector#byTag(java.lang.String)}
+     * </p>
+     * <p/>
+     * <p>
+     * Choose SQL92: {@link MessageSelector#bySql(java.lang.String)}
+     * </p>
+     *
+     * @param topic
+     * @param selector message selector({@link MessageSelector}), can be null.
+     * @throws MQClientException
+     */
+    void subscribe(final String topic, final MessageSelector selector) throws MQClientException;
+
+    /**
      * Unsubscribe consumption some topic
      *
      * @param topic message topic

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/client/src/main/java/org/apache/rocketmq/client/consumer/MessageSelector.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/MessageSelector.java b/client/src/main/java/org/apache/rocketmq/client/consumer/MessageSelector.java
new file mode 100644
index 0000000..35a5181
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/MessageSelector.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.consumer;
+
+import org.apache.rocketmq.common.filter.ExpressionType;
+
+/**
+ *
+ * Message selector: select message at server.
+ * <p>
+ *     Now, support:
+ *     <li>Tag: {@link org.apache.rocketmq.common.filter.ExpressionType#TAG}
+ *     </li>
+ *     <li>SQL92: {@link org.apache.rocketmq.common.filter.ExpressionType#SQL92}
+ *     </li>
+ * </p>
+ */
+public class MessageSelector {
+
+    /**
+     * @see org.apache.rocketmq.common.filter.ExpressionType
+     */
+    private String type;
+
+    /**
+     * expression content.
+     */
+    private String expression;
+
+    private MessageSelector(String type, String expression) {
+        this.type = type;
+        this.expression = expression;
+    }
+
+    /**
+     * Use SLQ92 to select message.
+     *
+     * @param sql if null or empty, will be treated as select all message.
+     * @return
+     */
+    public static MessageSelector bySql(String sql) {
+        return new MessageSelector(ExpressionType.SQL92, sql);
+    }
+
+    /**
+     * Use tag to select message.
+     *
+     * @param tag if null or empty or "*", will be treated as select all message.
+     * @return
+     */
+    public static MessageSelector byTag(String tag) {
+        return new MessageSelector(ExpressionType.TAG, tag);
+    }
+
+    public String getExpressionType() {
+        return type;
+    }
+
+    public String getExpression() {
+        return expression;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/client/src/main/java/org/apache/rocketmq/client/impl/FindBrokerResult.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/FindBrokerResult.java b/client/src/main/java/org/apache/rocketmq/client/impl/FindBrokerResult.java
index 295060e..4367a4c 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/FindBrokerResult.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/FindBrokerResult.java
@@ -19,10 +19,18 @@ package org.apache.rocketmq.client.impl;
 public class FindBrokerResult {
     private final String brokerAddr;
     private final boolean slave;
+    private final int brokerVersion;
 
     public FindBrokerResult(String brokerAddr, boolean slave) {
         this.brokerAddr = brokerAddr;
         this.slave = slave;
+        this.brokerVersion = 0;
+    }
+
+    public FindBrokerResult(String brokerAddr, boolean slave, int brokerVersion) {
+        this.brokerAddr = brokerAddr;
+        this.slave = slave;
+        this.brokerVersion = brokerVersion;
     }
 
     public String getBrokerAddr() {
@@ -32,4 +40,8 @@ public class FindBrokerResult {
     public boolean isSlave() {
         return slave;
     }
+
+    public int getBrokerVersion() {
+        return brokerVersion;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index ff25334..4244bdd 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -59,6 +59,7 @@ import org.apache.rocketmq.common.namesrv.TopAddressing;
 import org.apache.rocketmq.common.protocol.RequestCode;
 import org.apache.rocketmq.common.protocol.ResponseCode;
 import org.apache.rocketmq.common.protocol.body.BrokerStatsData;
+import org.apache.rocketmq.common.protocol.body.CheckClientRequestBody;
 import org.apache.rocketmq.common.protocol.body.ClusterInfo;
 import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
 import org.apache.rocketmq.common.protocol.body.ConsumeStatsList;
@@ -70,6 +71,7 @@ import org.apache.rocketmq.common.protocol.body.KVTable;
 import org.apache.rocketmq.common.protocol.body.LockBatchRequestBody;
 import org.apache.rocketmq.common.protocol.body.LockBatchResponseBody;
 import org.apache.rocketmq.common.protocol.body.ProducerConnection;
+import org.apache.rocketmq.common.protocol.body.QueryConsumeQueueResponseBody;
 import org.apache.rocketmq.common.protocol.body.QueryConsumeTimeSpanBody;
 import org.apache.rocketmq.common.protocol.body.QueryCorrectionOffsetBody;
 import org.apache.rocketmq.common.protocol.body.QueueTimeSpan;
@@ -103,6 +105,7 @@ import org.apache.rocketmq.common.protocol.header.GetTopicStatsInfoRequestHeader
 import org.apache.rocketmq.common.protocol.header.GetTopicsByClusterRequestHeader;
 import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
 import org.apache.rocketmq.common.protocol.header.PullMessageResponseHeader;
+import org.apache.rocketmq.common.protocol.header.QueryConsumeQueueRequestHeader;
 import org.apache.rocketmq.common.protocol.header.QueryConsumeTimeSpanRequestHeader;
 import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetRequestHeader;
 import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetResponseHeader;
@@ -129,6 +132,7 @@ import org.apache.rocketmq.common.protocol.header.namesrv.PutKVConfigRequestHead
 import org.apache.rocketmq.common.protocol.header.namesrv.WipeWritePermOfBrokerRequestHeader;
 import org.apache.rocketmq.common.protocol.header.namesrv.WipeWritePermOfBrokerResponseHeader;
 import org.apache.rocketmq.common.protocol.heartbeat.HeartbeatData;
+import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
 import org.apache.rocketmq.common.protocol.route.TopicRouteData;
 import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
 import org.apache.rocketmq.remoting.InvokeCallback;
@@ -168,7 +172,7 @@ public class MQClientAPIImpl {
     public MQClientAPIImpl(final NettyClientConfig nettyClientConfig, final ClientRemotingProcessor clientRemotingProcessor,
         RPCHook rpcHook, final ClientConfig clientConfig) {
         this.clientConfig = clientConfig;
-        topAddressing = new TopAddressing(MixAll.WS_ADDR, clientConfig.getUnitName());
+        topAddressing = new TopAddressing(MixAll.getWSAddr(), clientConfig.getUnitName());
         this.remotingClient = new NettyRemotingClient(nettyClientConfig, null);
         this.clientRemotingProcessor = clientRemotingProcessor;
 
@@ -843,7 +847,7 @@ public class MQClientAPIImpl {
         this.remotingClient.invokeOneway(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis);
     }
 
-    public void sendHearbeat(//
+    public int sendHearbeat(//
         final String addr, //
         final HeartbeatData heartbeatData, //
         final long timeoutMillis//
@@ -855,7 +859,7 @@ public class MQClientAPIImpl {
         assert response != null;
         switch (response.getCode()) {
             case ResponseCode.SUCCESS: {
-                return;
+                return response.getVersion();
             }
             default:
                 break;
@@ -2024,4 +2028,51 @@ public class MQClientAPIImpl {
         return configMap;
     }
 
+    public QueryConsumeQueueResponseBody queryConsumeQueue(final String brokerAddr, final String topic, final int queueId,
+                                                   final long index, final int count, final String consumerGroup,
+                                                   final long timeoutMillis) throws InterruptedException,
+        RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQClientException {
+
+        QueryConsumeQueueRequestHeader requestHeader = new QueryConsumeQueueRequestHeader();
+        requestHeader.setTopic(topic);
+        requestHeader.setQueueId(queueId);
+        requestHeader.setIndex(index);
+        requestHeader.setCount(count);
+        requestHeader.setConsumerGroup(consumerGroup);
+
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.QUERY_CONSUME_QUEUE, requestHeader);
+
+        RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), brokerAddr), request, timeoutMillis);
+
+        assert response != null;
+
+        if (ResponseCode.SUCCESS == response.getCode()) {
+            return QueryConsumeQueueResponseBody.decode(response.getBody(), QueryConsumeQueueResponseBody.class);
+        }
+
+        throw new MQClientException(response.getCode(), response.getRemark());
+    }
+
+    public void checkClientInBroker(final String brokerAddr, final String consumerGroup,
+                                    final String clientId, final SubscriptionData subscriptionData,
+                                    final long timeoutMillis)
+        throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException,
+        RemotingConnectException, MQClientException {
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CHECK_CLIENT_CONFIG, null);
+
+        CheckClientRequestBody requestBody = new CheckClientRequestBody();
+        requestBody.setClientId(clientId);
+        requestBody.setGroup(consumerGroup);
+        requestBody.setSubscriptionData(subscriptionData);
+
+        request.setBody(requestBody.encode());
+
+        RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), brokerAddr), request, timeoutMillis);
+
+        assert response != null;
+
+        if (ResponseCode.SUCCESS != response.getCode()) {
+            throw new MQClientException(response.getCode(), response.getRemark());
+        }
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
index 67f3ebe..2cafe29 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
@@ -30,6 +30,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import org.apache.rocketmq.client.QueryResult;
 import org.apache.rocketmq.client.Validators;
 import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.MessageSelector;
 import org.apache.rocketmq.client.consumer.PullCallback;
 import org.apache.rocketmq.client.consumer.PullResult;
 import org.apache.rocketmq.client.consumer.listener.MessageListener;
@@ -405,15 +406,16 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
             this.pullAPIWrapper.pullKernelImpl(//
                 pullRequest.getMessageQueue(), // 1
                 subExpression, // 2
-                subscriptionData.getSubVersion(), // 3
-                pullRequest.getNextOffset(), // 4
-                this.defaultMQPushConsumer.getPullBatchSize(), // 5
-                sysFlag, // 6
-                commitOffsetValue, // 7
-                BROKER_SUSPEND_MAX_TIME_MILLIS, // 8
-                CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND, // 9
-                CommunicationMode.ASYNC, // 10
-                pullCallback// 11
+                subscriptionData.getExpressionType(), // 3
+                subscriptionData.getSubVersion(), // 4
+                pullRequest.getNextOffset(), // 5
+                this.defaultMQPushConsumer.getPullBatchSize(), // 6
+                sysFlag, // 7
+                commitOffsetValue, // 8
+                BROKER_SUSPEND_MAX_TIME_MILLIS, // 9
+                CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND, // 10
+                CommunicationMode.ASYNC, // 11
+                pullCallback // 12
             );
         } catch (Exception e) {
             log.error("pullKernelImpl exception", e);
@@ -615,6 +617,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
         }
 
         this.updateTopicSubscribeInfoWhenSubscriptionChanged();
+        this.mQClientFactory.checkClientInBroker();
         this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
         this.mQClientFactory.rebalanceImmediately();
     }
@@ -836,6 +839,25 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
         }
     }
 
+    public void subscribe(final String topic, final MessageSelector messageSelector) throws MQClientException {
+        try {
+            if (messageSelector == null) {
+                subscribe(topic, SubscriptionData.SUB_ALL);
+                return;
+            }
+
+            SubscriptionData subscriptionData = FilterAPI.build(topic,
+                messageSelector.getExpression(), messageSelector.getExpressionType());
+
+            this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
+            if (this.mQClientFactory != null) {
+                this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
+            }
+        } catch (Exception e) {
+            throw new MQClientException("subscription exception", e);
+        }
+    }
+
     public void suspend() {
         this.pause = true;
         log.info("suspend this consumer, {}", this.defaultMQPushConsumer.getConsumerGroup());

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java
index 96e21e1..304a44a 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java
@@ -33,7 +33,9 @@ import org.apache.rocketmq.client.impl.CommunicationMode;
 import org.apache.rocketmq.client.impl.FindBrokerResult;
 import org.apache.rocketmq.client.impl.factory.MQClientInstance;
 import org.apache.rocketmq.client.log.ClientLogger;
+import org.apache.rocketmq.common.MQVersion;
 import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.filter.ExpressionType;
 import org.apache.rocketmq.common.message.MessageAccessor;
 import org.apache.rocketmq.common.message.MessageConst;
 import org.apache.rocketmq.common.message.MessageDecoder;
@@ -135,6 +137,7 @@ public class PullAPIWrapper {
     public PullResult pullKernelImpl(
         final MessageQueue mq,
         final String subExpression,
+        final String expressionType,
         final long subVersion,
         final long offset,
         final int maxNums,
@@ -156,6 +159,14 @@ public class PullAPIWrapper {
         }
 
         if (findBrokerResult != null) {
+            {
+                // check version
+                if (!ExpressionType.isTagType(expressionType)
+                    && findBrokerResult.getBrokerVersion() < MQVersion.Version.V4_1_0_SNAPSHOT.ordinal()) {
+                    throw new MQClientException("The broker[" + mq.getBrokerName() + ", "
+                        + findBrokerResult.getBrokerVersion() + "] does not upgrade to support for filter message by " + expressionType, null);
+                }
+            }
             int sysFlagInner = sysFlag;
 
             if (findBrokerResult.isSlave()) {
@@ -173,6 +184,7 @@ public class PullAPIWrapper {
             requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis);
             requestHeader.setSubscription(subExpression);
             requestHeader.setSubVersion(subVersion);
+            requestHeader.setExpressionType(expressionType);
 
             String brokerAddr = findBrokerResult.getBrokerAddr();
             if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) {
@@ -192,6 +204,34 @@ public class PullAPIWrapper {
         throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
     }
 
+    public PullResult pullKernelImpl(
+        final MessageQueue mq,
+        final String subExpression,
+        final long subVersion,
+        final long offset,
+        final int maxNums,
+        final int sysFlag,
+        final long commitOffset,
+        final long brokerSuspendMaxTimeMillis,
+        final long timeoutMillis,
+        final CommunicationMode communicationMode,
+        final PullCallback pullCallback
+    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+        return pullKernelImpl(
+            mq,
+            subExpression,
+            ExpressionType.TAG,
+            subVersion, offset,
+            maxNums,
+            sysFlag,
+            commitOffset,
+            brokerSuspendMaxTimeMillis,
+            timeoutMillis,
+            communicationMode,
+            pullCallback
+        );
+    }
+
     public long recalculatePullFromWhichNode(final MessageQueue mq) {
         if (this.isConnectBrokerByUser()) {
             return this.defaultBrokerId;

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
index d7e02fe..a8c65b2 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
@@ -61,6 +61,7 @@ import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.ServiceState;
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.constant.PermName;
+import org.apache.rocketmq.common.filter.ExpressionType;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
@@ -98,6 +99,8 @@ public class MQClientInstance {
     private final Lock lockHeartbeat = new ReentrantLock();
     private final ConcurrentHashMap<String/* Broker Name */, HashMap<Long/* brokerId */, String/* address */>> brokerAddrTable =
         new ConcurrentHashMap<String, HashMap<Long, String>>();
+    private final ConcurrentHashMap<String/* Broker Name */, HashMap<String/* address */, Integer>> brokerVersionTable =
+        new ConcurrentHashMap<String, HashMap<String, Integer>>();
     private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
         @Override
         public Thread newThread(Runnable r) {
@@ -404,6 +407,44 @@ public class MQClientInstance {
         }
     }
 
+    public void checkClientInBroker() throws MQClientException {
+        Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
+
+        while (it.hasNext()) {
+            Entry<String, MQConsumerInner> entry = it.next();
+            Set<SubscriptionData> subscriptionInner = entry.getValue().subscriptions();
+            if (subscriptionInner == null || subscriptionInner.isEmpty()) {
+                return;
+            }
+
+            for (SubscriptionData subscriptionData : subscriptionInner) {
+                if (ExpressionType.isTagType(subscriptionData.getExpressionType())) {
+                    continue;
+                }
+                // may need to check one broker every cluster...
+                // assume that the configs of every broker in cluster are the the same.
+                String addr = findBrokerAddrByTopic(subscriptionData.getTopic());
+
+                if (addr != null) {
+                    try {
+                        this.getMQClientAPIImpl().checkClientInBroker(
+                            addr, entry.getKey(), this.clientId, subscriptionData, 3 * 1000
+                        );
+                    } catch (Exception e) {
+                        if (e instanceof MQClientException) {
+                            throw (MQClientException) e;
+                        } else {
+                            throw new MQClientException("Check client in broker error, maybe because you use "
+                                + subscriptionData.getExpressionType() + " to filter message, but server has not been upgraded to support!"
+                                + "This error would not affect the launch of consumer, but may has impact on message receiving if you " +
+                                "have use the new features which are not supported by server, please check the log!", e);
+                        }
+                    }
+                }
+            }
+        }
+    }
+
     public void sendHeartbeatToAllBrokerWithLock() {
         if (this.lockHeartbeat.tryLock()) {
             try {
@@ -493,7 +534,11 @@ public class MQClientInstance {
                         }
 
                         try {
-                            this.mQClientAPIImpl.sendHearbeat(addr, heartbeatData, 3000);
+                            int version = this.mQClientAPIImpl.sendHearbeat(addr, heartbeatData, 3000);
+                            if (!this.brokerVersionTable.containsKey(brokerName)) {
+                                this.brokerVersionTable.put(brokerName, new HashMap<String, Integer>(4));
+                            }
+                            this.brokerVersionTable.get(brokerName).put(addr, version);
                             if (times % 20 == 0) {
                                 log.info("send heart beat to broker[{} {} {}] success", brokerName, id, addr);
                                 log.info(heartbeatData.toString());
@@ -943,7 +988,7 @@ public class MQClientInstance {
         }
 
         if (found) {
-            return new FindBrokerResult(brokerAddr, slave);
+            return new FindBrokerResult(brokerAddr, slave, findBrokerVersion(brokerName, brokerAddr));
         }
 
         return null;
@@ -982,12 +1027,21 @@ public class MQClientInstance {
         }
 
         if (found) {
-            return new FindBrokerResult(brokerAddr, slave);
+            return new FindBrokerResult(brokerAddr, slave, findBrokerVersion(brokerName, brokerAddr));
         }
 
         return null;
     }
 
+    public int findBrokerVersion(String brokerName, String brokerAddr) {
+        if (this.brokerVersionTable.containsKey(brokerName)) {
+            if (this.brokerVersionTable.get(brokerName).containsKey(brokerAddr)) {
+                return this.brokerVersionTable.get(brokerName).get(brokerAddr);
+            }
+        }
+        return 0;
+    }
+
     public List<String> findConsumerIdList(final String topic, final String group) {
         String brokerAddr = this.findBrokerAddrByTopic(topic);
         if (null == brokerAddr) {

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index f79f726..f0a73bd 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -99,6 +99,25 @@ public class BrokerConfig {
 
     private boolean traceOn = true;
 
+    // Switch of filter bit map calculation.
+    // If switch on:
+    // 1. Calculate filter bit map when construct queue.
+    // 2. Filter bit map will be saved to consume queue extend file if allowed.
+    private boolean enableCalcFilterBitMap = false;
+
+    // Expect num of consumers will use filter.
+    private int expectConsumerNumUseFilter = 32;
+
+    // Error rate of bloom filter, 1~100.
+    private int maxErrorRateOfBloomFilter = 20;
+
+    //how long to clean filter data after dead.Default: 24h
+    private long filterDataCleanTimeSpan = 24 * 3600 * 1000;
+
+    // whether do filter when retry.
+    private boolean filterSupportRetry = false;
+    private boolean enablePropertyFilter = false;
+
     public static String localHostName() {
         try {
             return InetAddress.getLocalHost().getHostName();
@@ -484,4 +503,52 @@ public class BrokerConfig {
     public void setCommercialBaseCount(int commercialBaseCount) {
         this.commercialBaseCount = commercialBaseCount;
     }
+
+    public boolean isEnableCalcFilterBitMap() {
+        return enableCalcFilterBitMap;
+    }
+
+    public void setEnableCalcFilterBitMap(boolean enableCalcFilterBitMap) {
+        this.enableCalcFilterBitMap = enableCalcFilterBitMap;
+    }
+
+    public int getExpectConsumerNumUseFilter() {
+        return expectConsumerNumUseFilter;
+    }
+
+    public void setExpectConsumerNumUseFilter(int expectConsumerNumUseFilter) {
+        this.expectConsumerNumUseFilter = expectConsumerNumUseFilter;
+    }
+
+    public int getMaxErrorRateOfBloomFilter() {
+        return maxErrorRateOfBloomFilter;
+    }
+
+    public void setMaxErrorRateOfBloomFilter(int maxErrorRateOfBloomFilter) {
+        this.maxErrorRateOfBloomFilter = maxErrorRateOfBloomFilter;
+    }
+
+    public long getFilterDataCleanTimeSpan() {
+        return filterDataCleanTimeSpan;
+    }
+
+    public void setFilterDataCleanTimeSpan(long filterDataCleanTimeSpan) {
+        this.filterDataCleanTimeSpan = filterDataCleanTimeSpan;
+    }
+
+    public boolean isFilterSupportRetry() {
+        return filterSupportRetry;
+    }
+
+    public void setFilterSupportRetry(boolean filterSupportRetry) {
+        this.filterSupportRetry = filterSupportRetry;
+    }
+
+    public boolean isEnablePropertyFilter() {
+        return enablePropertyFilter;
+    }
+
+    public void setEnablePropertyFilter(boolean enablePropertyFilter) {
+        this.enablePropertyFilter = enablePropertyFilter;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/common/src/main/java/org/apache/rocketmq/common/MixAll.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/MixAll.java b/common/src/main/java/org/apache/rocketmq/common/MixAll.java
index 4a54a60..e75efd9 100644
--- a/common/src/main/java/org/apache/rocketmq/common/MixAll.java
+++ b/common/src/main/java/org/apache/rocketmq/common/MixAll.java
@@ -55,8 +55,8 @@ public class MixAll {
     public static final String DEFAULT_NAMESRV_ADDR_LOOKUP = "jmenv.tbsite.net";
     public static final String WS_DOMAIN_NAME = System.getProperty("rocketmq.namesrv.domain", DEFAULT_NAMESRV_ADDR_LOOKUP);
     public static final String WS_DOMAIN_SUBGROUP = System.getProperty("rocketmq.namesrv.domain.subgroup", "nsaddr");
-    // http://jmenv.tbsite.net:8080/rocketmq/nsaddr
-    public static final String WS_ADDR = "http://" + WS_DOMAIN_NAME + ":8080/rocketmq/" + WS_DOMAIN_SUBGROUP;
+//    // http://jmenv.tbsite.net:8080/rocketmq/nsaddr
+//    public static final String WS_ADDR = "http://" + WS_DOMAIN_NAME + ":8080/rocketmq/" + WS_DOMAIN_SUBGROUP;
     public static final String DEFAULT_TOPIC = "TBW102";
     public static final String BENCHMARK_TOPIC = "BenchmarkTest";
     public static final String DEFAULT_PRODUCER_GROUP = "DEFAULT_PRODUCER";
@@ -89,6 +89,16 @@ public class MixAll {
     public static final String DEFAULT_TRACE_REGION_ID = "DefaultRegion";
     public static final String CONSUME_CONTEXT_TYPE = "ConsumeContextType";
 
+    public static String getWSAddr() {
+        String wsDomainName = System.getProperty("rocketmq.namesrv.domain", DEFAULT_NAMESRV_ADDR_LOOKUP);
+        String wsDomainSubgroup = System.getProperty("rocketmq.namesrv.domain.subgroup", "nsaddr");
+        String wsAddr = "http://" + wsDomainName + ":8080/rocketmq/" + wsDomainSubgroup;
+        if (wsDomainName.indexOf(":") > 0) {
+            wsAddr = "http://" + wsDomainName + "/rocketmq/" + wsDomainSubgroup;
+        }
+        return wsAddr;
+    }
+
     public static String getRetryTopic(final String consumerGroup) {
         return RETRY_GROUP_TOPIC_PREFIX + consumerGroup;
     }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/common/src/main/java/org/apache/rocketmq/common/constant/LoggerName.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/constant/LoggerName.java b/common/src/main/java/org/apache/rocketmq/common/constant/LoggerName.java
index e706e28..385c121 100644
--- a/common/src/main/java/org/apache/rocketmq/common/constant/LoggerName.java
+++ b/common/src/main/java/org/apache/rocketmq/common/constant/LoggerName.java
@@ -34,4 +34,5 @@ public class LoggerName {
     public static final String DUPLICATION_LOGGER_NAME = "RocketmqDuplication";
     public static final String PROTECTION_LOGGER_NAME = "RocketmqProtection";
     public static final String WATER_MARK_LOGGER_NAME = "RocketmqWaterMark";
+    public static final String FILTER_LOGGER_NAME = "RocketmqFilter";
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/common/src/main/java/org/apache/rocketmq/common/filter/ExpressionType.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/filter/ExpressionType.java b/common/src/main/java/org/apache/rocketmq/common/filter/ExpressionType.java
new file mode 100644
index 0000000..3b7940a
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/filter/ExpressionType.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.filter;
+
+public class ExpressionType {
+
+    /**
+     * <ul>
+     * Keywords:
+     * <li>{@code AND, OR, NOT, BETWEEN, IN, TRUE, FALSE, IS, NULL}</li>
+     * </ul>
+     * <p/>
+     * <ul>
+     * Data type:
+     * <li>Boolean, like: TRUE, FALSE</li>
+     * <li>String, like: 'abc'</li>
+     * <li>Decimal, like: 123</li>
+     * <li>Float number, like: 3.1415</li>
+     * </ul>
+     * <p/>
+     * <ul>
+     * Grammar:
+     * <li>{@code AND, OR}</li>
+     * <li>{@code >, >=, <, <=, =}</li>
+     * <li>{@code BETWEEN A AND B}, equals to {@code >=A AND <=B}</li>
+     * <li>{@code NOT BETWEEN A AND B}, equals to {@code >B OR <A}</li>
+     * <li>{@code IN ('a', 'b')}, equals to {@code ='a' OR ='b'}, this operation only support String type.</li>
+     * <li>{@code IS NULL}, {@code IS NOT NULL}, check parameter whether is null, or not.</li>
+     * <li>{@code =TRUE}, {@code =FALSE}, check parameter whether is true, or false.</li>
+     * </ul>
+     * <p/>
+     * <p>
+     * Example:
+     * (a > 10 AND a < 100) OR (b IS NOT NULL AND b=TRUE)
+     * </p>
+     */
+    public static final String SQL92 = "SQL92";
+
+    /**
+     * Only support or operation such as
+     * "tag1 || tag2 || tag3", <br>
+     * If null or * expression,meaning subscribe all.
+     */
+    public static final String TAG = "TAG";
+
+    public static boolean isTagType(String type) {
+        if (type == null || TAG.equals(type)) {
+            return true;
+        }
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9eeb2f7e/common/src/main/java/org/apache/rocketmq/common/filter/FilterAPI.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/filter/FilterAPI.java b/common/src/main/java/org/apache/rocketmq/common/filter/FilterAPI.java
index e9bf3fa..fc8525c 100644
--- a/common/src/main/java/org/apache/rocketmq/common/filter/FilterAPI.java
+++ b/common/src/main/java/org/apache/rocketmq/common/filter/FilterAPI.java
@@ -63,4 +63,22 @@ public class FilterAPI {
 
         return subscriptionData;
     }
+
+    public static SubscriptionData build(final String topic, final String subString,
+                                         final String type) throws Exception {
+        if (ExpressionType.TAG.equals(type) || type == null) {
+            return buildSubscriptionData(null, topic, subString);
+        }
+
+        if (subString == null || subString.length() < 1) {
+            throw new IllegalArgumentException("Expression can't be null! " + type);
+        }
+
+        SubscriptionData subscriptionData = new SubscriptionData();
+        subscriptionData.setTopic(topic);
+        subscriptionData.setSubString(subString);
+        subscriptionData.setExpressionType(type);
+
+        return subscriptionData;
+    }
 }