You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by vo...@apache.org on 2018/11/27 02:27:48 UTC

[rocketmq] branch develop updated (3c524c2 -> 82a5961)

This is an automated email from the ASF dual-hosted git repository.

vongosling pushed a change to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git.


    from 3c524c2  fix comment
     new 3530b76  MQPullConsumer support MessageSelector
     new 11bb267  Make tag type use 0 as subVersion
     new 82a5961  Add integration test case

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../client/consumer/DefaultMQPullConsumer.java     | 26 ++++++
 .../rocketmq/client/consumer/MQPullConsumer.java   | 47 +++++++++++
 .../impl/consumer/DefaultMQPullConsumerImpl.java   | 98 ++++++++++++++++------
 .../client/impl/consumer/PullAPIWrapper.java       | 28 -------
 .../client/impl/factory/MQClientInstance.java      | 13 +++
 .../test/client/consumer/filter/SqlFilterIT.java   | 75 ++++++++++++++++-
 6 files changed, 231 insertions(+), 56 deletions(-)


[rocketmq] 01/03: MQPullConsumer support MessageSelector

Posted by vo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

vongosling pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit 3530b76e7bd1a493ac302948378b7d080430e2ec
Author: maowei.ymw <ma...@alibaba-inc.com>
AuthorDate: Fri Nov 2 17:03:04 2018 +0800

    MQPullConsumer support MessageSelector
---
 .../client/consumer/DefaultMQPullConsumer.java     | 26 ++++++
 .../rocketmq/client/consumer/MQPullConsumer.java   | 47 +++++++++++
 .../impl/consumer/DefaultMQPullConsumerImpl.java   | 95 ++++++++++++++++------
 .../client/impl/consumer/PullAPIWrapper.java       | 29 +------
 .../client/impl/factory/MQClientInstance.java      | 13 +++
 5 files changed, 157 insertions(+), 53 deletions(-)

diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java
index cd70670..6befbf3 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java
@@ -258,6 +258,18 @@ public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsume
     }
 
     @Override
+    public PullResult pull(MessageQueue mq, MessageSelector messageSelector, long offset, int maxNums)
+        throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+        return this.defaultMQPullConsumerImpl.pull(mq, messageSelector, offset, maxNums);
+    }
+
+    @Override
+    public PullResult pull(MessageQueue mq, MessageSelector messageSelector, long offset, int maxNums, long timeout)
+        throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+        return this.defaultMQPullConsumerImpl.pull(mq, messageSelector, offset, maxNums, timeout);
+    }
+
+    @Override
     public void pull(MessageQueue mq, String subExpression, long offset, int maxNums, PullCallback pullCallback)
         throws MQClientException, RemotingException, InterruptedException {
         this.defaultMQPullConsumerImpl.pull(mq, subExpression, offset, maxNums, pullCallback);
@@ -271,6 +283,20 @@ public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsume
     }
 
     @Override
+    public void pull(MessageQueue mq, MessageSelector messageSelector, long offset, int maxNums,
+        PullCallback pullCallback)
+        throws MQClientException, RemotingException, InterruptedException {
+        this.defaultMQPullConsumerImpl.pull(mq, messageSelector, offset, maxNums, pullCallback);
+    }
+
+    @Override
+    public void pull(MessageQueue mq, MessageSelector messageSelector, long offset, int maxNums,
+        PullCallback pullCallback, long timeout)
+        throws MQClientException, RemotingException, InterruptedException {
+        this.defaultMQPullConsumerImpl.pull(mq, messageSelector, offset, maxNums, pullCallback, timeout);
+    }
+
+    @Override
     public PullResult pullBlockIfNotFound(MessageQueue mq, String subExpression, long offset, int maxNums)
         throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
         return this.defaultMQPullConsumerImpl.pullBlockIfNotFound(mq, subExpression, offset, maxNums);
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumer.java
index 33002c9..28b807c 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumer.java
@@ -67,6 +67,39 @@ public interface MQPullConsumer extends MQConsumer {
         MQBrokerException, InterruptedException;
 
     /**
+     * Pulling the messages, not blocking
+     * <p>
+     * support other message selection, such as {@link org.apache.rocketmq.common.filter.ExpressionType#SQL92}
+     * </p>
+     *
+     * @param mq from which message queue
+     * @param selector message selector({@link MessageSelector}), can be null.
+     * @param offset from where to pull
+     * @param maxNums max pulling numbers
+     * @return The resulting {@code PullRequest}
+     */
+    PullResult pull(final MessageQueue mq, final MessageSelector selector, final long offset,
+        final int maxNums) throws MQClientException, RemotingException, MQBrokerException,
+        InterruptedException;
+
+    /**
+     * Pulling the messages in the specified timeout
+     * <p>
+     * support other message selection, such as {@link org.apache.rocketmq.common.filter.ExpressionType#SQL92}
+     * </p>
+     *
+     * @param mq from which message queue
+     * @param selector message selector({@link MessageSelector}), can be null.
+     * @param offset from where to pull
+     * @param maxNums max pulling numbers
+     * @param timeout Pulling the messages in the specified timeout
+     * @return The resulting {@code PullRequest}
+     */
+    PullResult pull(final MessageQueue mq, final MessageSelector selector, final long offset,
+        final int maxNums, final long timeout) throws MQClientException, RemotingException, MQBrokerException,
+        InterruptedException;
+
+    /**
      * Pulling the messages in a async. way
      */
     void pull(final MessageQueue mq, final String subExpression, final long offset, final int maxNums,
@@ -81,6 +114,20 @@ public interface MQPullConsumer extends MQConsumer {
         InterruptedException;
 
     /**
+     * Pulling the messages in a async. way. Support message selection
+     */
+    void pull(final MessageQueue mq, final MessageSelector selector, final long offset, final int maxNums,
+        final PullCallback pullCallback) throws MQClientException, RemotingException,
+        InterruptedException;
+
+    /**
+     * Pulling the messages in a async. way. Support message selection
+     */
+    void pull(final MessageQueue mq, final MessageSelector selector, final long offset, final int maxNums,
+        final PullCallback pullCallback, long timeout) throws MQClientException, RemotingException,
+        InterruptedException;
+
+    /**
      * Pulling the messages,if no message arrival,blocking some time
      *
      * @return The resulting {@code PullRequest}
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
index 420d89b..1804df2 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
@@ -26,6 +26,7 @@ import java.util.concurrent.ConcurrentMap;
 import org.apache.rocketmq.client.QueryResult;
 import org.apache.rocketmq.client.Validators;
 import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
+import org.apache.rocketmq.client.consumer.MessageSelector;
 import org.apache.rocketmq.client.consumer.PullCallback;
 import org.apache.rocketmq.client.consumer.PullResult;
 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
@@ -158,17 +159,58 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
 
     public PullResult pull(MessageQueue mq, String subExpression, long offset, int maxNums, long timeout)
         throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
-        return this.pullSyncImpl(mq, subExpression, offset, maxNums, false, timeout);
+        SubscriptionData subscriptionData = getSubscriptionData(mq, subExpression);
+        return this.pullSyncImpl(mq, subscriptionData, offset, maxNums, false, timeout);
     }
 
-    private PullResult pullSyncImpl(MessageQueue mq, String subExpression, long offset, int maxNums, boolean block,
+    public PullResult pull(MessageQueue mq, MessageSelector messageSelector, long offset, int maxNums)
+        throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+        return pull(mq, messageSelector, offset, maxNums, this.defaultMQPullConsumer.getConsumerPullTimeoutMillis());
+    }
+
+    public PullResult pull(MessageQueue mq, MessageSelector messageSelector, long offset, int maxNums, long timeout)
+        throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+        SubscriptionData subscriptionData = getSubscriptionData(mq, messageSelector);
+        return this.pullSyncImpl(mq, subscriptionData, offset, maxNums, false, timeout);
+    }
+
+    private SubscriptionData getSubscriptionData(MessageQueue mq, String subExpression)
+        throws MQClientException {
+
+        if (null == mq) {
+            throw new MQClientException("mq is null", null);
+        }
+
+        try {
+            return FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(),
+                mq.getTopic(), subExpression);
+        } catch (Exception e) {
+            throw new MQClientException("parse subscription error", e);
+        }
+    }
+
+    private SubscriptionData getSubscriptionData(MessageQueue mq, MessageSelector messageSelector)
+        throws MQClientException {
+
+        if (null == mq) {
+            throw new MQClientException("mq is null", null);
+        }
+
+        try {
+            return FilterAPI.build(mq.getTopic(),
+                messageSelector.getExpression(), messageSelector.getExpressionType());
+        } catch (Exception e) {
+            throw new MQClientException("parse subscription error", e);
+        }
+    }
+
+    private PullResult pullSyncImpl(MessageQueue mq, SubscriptionData subscriptionData, long offset, int maxNums, boolean block,
         long timeout)
         throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
         this.makeSureStateOK();
 
         if (null == mq) {
             throw new MQClientException("mq is null", null);
-
         }
 
         if (offset < 0) {
@@ -183,20 +225,13 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
 
         int sysFlag = PullSysFlag.buildSysFlag(false, block, true, false);
 
-        SubscriptionData subscriptionData;
-        try {
-            subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(),
-                mq.getTopic(), subExpression);
-        } catch (Exception e) {
-            throw new MQClientException("parse subscription error", e);
-        }
-
         long timeoutMillis = block ? this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() : timeout;
 
         PullResult pullResult = this.pullAPIWrapper.pullKernelImpl(
             mq,
             subscriptionData.getSubString(),
-            0L,
+            subscriptionData.getExpressionType(),
+            subscriptionData.getSubVersion(),
             offset,
             maxNums,
             sysFlag,
@@ -369,12 +404,27 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
     public void pull(MessageQueue mq, String subExpression, long offset, int maxNums, PullCallback pullCallback,
         long timeout)
         throws MQClientException, RemotingException, InterruptedException {
-        this.pullAsyncImpl(mq, subExpression, offset, maxNums, pullCallback, false, timeout);
+        SubscriptionData subscriptionData = getSubscriptionData(mq, subExpression);
+        this.pullAsyncImpl(mq, subscriptionData, offset, maxNums, pullCallback, false, timeout);
+    }
+
+    public void pull(MessageQueue mq, MessageSelector messageSelector, long offset, int maxNums,
+        PullCallback pullCallback)
+        throws MQClientException, RemotingException, InterruptedException {
+        pull(mq, messageSelector, offset, maxNums, pullCallback, this.defaultMQPullConsumer.getConsumerPullTimeoutMillis());
+    }
+
+    public void pull(MessageQueue mq, MessageSelector messageSelector, long offset, int maxNums,
+        PullCallback pullCallback,
+        long timeout)
+        throws MQClientException, RemotingException, InterruptedException {
+        SubscriptionData subscriptionData = getSubscriptionData(mq, messageSelector);
+        this.pullAsyncImpl(mq, subscriptionData, offset, maxNums, pullCallback, false, timeout);
     }
 
     private void pullAsyncImpl(
         final MessageQueue mq,
-        final String subExpression,
+        final SubscriptionData subscriptionData,
         final long offset,
         final int maxNums,
         final PullCallback pullCallback,
@@ -403,20 +453,13 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
         try {
             int sysFlag = PullSysFlag.buildSysFlag(false, block, true, false);
 
-            final SubscriptionData subscriptionData;
-            try {
-                subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(),
-                    mq.getTopic(), subExpression);
-            } catch (Exception e) {
-                throw new MQClientException("parse subscription error", e);
-            }
-
             long timeoutMillis = block ? this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() : timeout;
 
             this.pullAPIWrapper.pullKernelImpl(
                 mq,
                 subscriptionData.getSubString(),
-                0L,
+                subscriptionData.getExpressionType(),
+                subscriptionData.getSubVersion(),
                 offset,
                 maxNums,
                 sysFlag,
@@ -444,7 +487,8 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
 
     public PullResult pullBlockIfNotFound(MessageQueue mq, String subExpression, long offset, int maxNums)
         throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
-        return this.pullSyncImpl(mq, subExpression, offset, maxNums, true, this.getDefaultMQPullConsumer().getConsumerPullTimeoutMillis());
+        SubscriptionData subscriptionData = getSubscriptionData(mq, subExpression);
+        return this.pullSyncImpl(mq, subscriptionData, offset, maxNums, true, this.getDefaultMQPullConsumer().getConsumerPullTimeoutMillis());
     }
 
     public DefaultMQPullConsumer getDefaultMQPullConsumer() {
@@ -454,7 +498,8 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
     public void pullBlockIfNotFound(MessageQueue mq, String subExpression, long offset, int maxNums,
         PullCallback pullCallback)
         throws MQClientException, RemotingException, InterruptedException {
-        this.pullAsyncImpl(mq, subExpression, offset, maxNums, pullCallback, true,
+        SubscriptionData subscriptionData = getSubscriptionData(mq, subExpression);
+        this.pullAsyncImpl(mq, subscriptionData, offset, maxNums, pullCallback, true,
             this.getDefaultMQPullConsumer().getConsumerPullTimeoutMillis());
     }
 
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java
index b650e35..ac44df4 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java
@@ -163,6 +163,7 @@ public class PullAPIWrapper {
                     this.recalculatePullFromWhichNode(mq), false);
         }
 
+
         if (findBrokerResult != null) {
             {
                 // check version
@@ -209,34 +210,6 @@ public class PullAPIWrapper {
         throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
     }
 
-    public PullResult pullKernelImpl(
-        final MessageQueue mq,
-        final String subExpression,
-        final long subVersion,
-        final long offset,
-        final int maxNums,
-        final int sysFlag,
-        final long commitOffset,
-        final long brokerSuspendMaxTimeMillis,
-        final long timeoutMillis,
-        final CommunicationMode communicationMode,
-        final PullCallback pullCallback
-    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
-        return pullKernelImpl(
-            mq,
-            subExpression,
-            ExpressionType.TAG,
-            subVersion, offset,
-            maxNums,
-            sysFlag,
-            commitOffset,
-            brokerSuspendMaxTimeMillis,
-            timeoutMillis,
-            communicationMode,
-            pullCallback
-        );
-    }
-
     public long recalculatePullFromWhichNode(final MessageQueue mq) {
         if (this.isConnectBrokerByUser()) {
             return this.defaultBrokerId;
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
index 9ffaed0..5172211 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
@@ -1046,6 +1046,19 @@ public class MQClientInstance {
             if (this.brokerVersionTable.get(brokerName).containsKey(brokerAddr)) {
                 return this.brokerVersionTable.get(brokerName).get(brokerAddr);
             }
+        }else{
+            HeartbeatData heartbeatData = prepareHeartbeatData();
+            try {
+                int version = this.mQClientAPIImpl.sendHearbeat(brokerAddr, heartbeatData, 3000);
+                return version;
+            } catch (Exception e) {
+                if (this.isBrokerInNameServer(brokerAddr)) {
+                    log.info("send heart beat to broker[{} {}] failed", brokerName, brokerAddr);
+                } else {
+                    log.info("send heart beat to broker[{} {}] exception, because the broker not up, forget it", brokerName,
+                        brokerAddr);
+                }
+            }
         }
         return 0;
     }


[rocketmq] 02/03: Make tag type use 0 as subVersion

Posted by vo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

vongosling pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit 11bb2672687266375acdea7dbd955c00b2e760c4
Author: maowei.ymw <ma...@alibaba-inc.com>
AuthorDate: Fri Nov 2 17:17:14 2018 +0800

    Make tag type use 0 as subVersion
---
 .../rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java   | 7 +++++--
 1 file changed, 5 insertions(+), 2 deletions(-)

diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
index 1804df2..39c43d5 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
@@ -47,6 +47,7 @@ import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.ServiceState;
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
+import org.apache.rocketmq.common.filter.ExpressionType;
 import org.apache.rocketmq.common.filter.FilterAPI;
 import org.apache.rocketmq.common.help.FAQUrl;
 import org.apache.rocketmq.logging.InternalLogger;
@@ -227,11 +228,12 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
 
         long timeoutMillis = block ? this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() : timeout;
 
+        boolean isTagType = ExpressionType.isTagType(subscriptionData.getExpressionType());
         PullResult pullResult = this.pullAPIWrapper.pullKernelImpl(
             mq,
             subscriptionData.getSubString(),
             subscriptionData.getExpressionType(),
-            subscriptionData.getSubVersion(),
+            isTagType ? 0L : subscriptionData.getSubVersion(),
             offset,
             maxNums,
             sysFlag,
@@ -455,11 +457,12 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
 
             long timeoutMillis = block ? this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() : timeout;
 
+            boolean isTagType = ExpressionType.isTagType(subscriptionData.getExpressionType());
             this.pullAPIWrapper.pullKernelImpl(
                 mq,
                 subscriptionData.getSubString(),
                 subscriptionData.getExpressionType(),
-                subscriptionData.getSubVersion(),
+                isTagType ? 0L : subscriptionData.getSubVersion(),
                 offset,
                 maxNums,
                 sysFlag,


[rocketmq] 03/03: Add integration test case

Posted by vo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

vongosling pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit 82a59610b8c53edd8713ea04e775222ba277dbfd
Author: maowei.ymw <ma...@alibaba-inc.com>
AuthorDate: Tue Nov 6 21:59:15 2018 +0800

    Add integration test case
---
 .../client/impl/consumer/PullAPIWrapper.java       |  1 -
 .../client/impl/factory/MQClientInstance.java      |  2 +-
 .../test/client/consumer/filter/SqlFilterIT.java   | 75 +++++++++++++++++++++-
 3 files changed, 73 insertions(+), 5 deletions(-)

diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java
index ac44df4..1d2d24f 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java
@@ -163,7 +163,6 @@ public class PullAPIWrapper {
                     this.recalculatePullFromWhichNode(mq), false);
         }
 
-
         if (findBrokerResult != null) {
             {
                 // check version
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
index 5172211..80347d1 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
@@ -1046,7 +1046,7 @@ public class MQClientInstance {
             if (this.brokerVersionTable.get(brokerName).containsKey(brokerAddr)) {
                 return this.brokerVersionTable.get(brokerName).get(brokerAddr);
             }
-        }else{
+        } else {
             HeartbeatData heartbeatData = prepareHeartbeatData();
             try {
                 int version = this.mQClientAPIImpl.sendHearbeat(brokerAddr, heartbeatData, 3000);
diff --git a/test/src/test/java/org/apache/rocketmq/test/client/consumer/filter/SqlFilterIT.java b/test/src/test/java/org/apache/rocketmq/test/client/consumer/filter/SqlFilterIT.java
index 79f15dc..a0f6555 100644
--- a/test/src/test/java/org/apache/rocketmq/test/client/consumer/filter/SqlFilterIT.java
+++ b/test/src/test/java/org/apache/rocketmq/test/client/consumer/filter/SqlFilterIT.java
@@ -17,12 +17,18 @@
 
 package org.apache.rocketmq.test.client.consumer.filter;
 
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import org.apache.log4j.Logger;
+import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
 import org.apache.rocketmq.client.consumer.MessageSelector;
+import org.apache.rocketmq.client.consumer.PullResult;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.test.base.BaseConf;
-import org.apache.rocketmq.test.client.consumer.broadcast.BaseBroadCastIT;
-import org.apache.rocketmq.test.client.consumer.broadcast.normal.NormalMsgTwoSameGroupConsumerIT;
-import org.apache.rocketmq.test.client.rmq.RMQBroadCastConsumer;
 import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
 import org.apache.rocketmq.test.client.rmq.RMQSqlConsumer;
 import org.apache.rocketmq.test.factory.ConsumerFactory;
@@ -39,12 +45,14 @@ public class SqlFilterIT extends BaseConf {
     private static Logger logger = Logger.getLogger(SqlFilterIT.class);
     private RMQNormalProducer producer = null;
     private String topic = null;
+    private static final Map<MessageQueue, Long> OFFSE_TABLE = new HashMap<MessageQueue, Long>();
 
     @Before
     public void setUp() {
         topic = initTopic();
         logger.info(String.format("use topic: %s;", topic));
         producer = getProducer(nsAddr, topic);
+        OFFSE_TABLE.clear();
     }
 
     @After
@@ -71,4 +79,65 @@ public class SqlFilterIT extends BaseConf {
 
         assertThat(consumer.getListener().getAllMsgBody().size()).isEqualTo(msgSize * 2);
     }
+
+    @Test
+    public void testFilterPullConsumer() throws Exception {
+        int msgSize = 16;
+
+        String group = initConsumerGroup();
+        MessageSelector selector = MessageSelector.bySql("(TAGS is not null and TAGS in ('TagA', 'TagB'))");
+        DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(group);
+        consumer.setNamesrvAddr(nsAddr);
+        consumer.start();
+        Thread.sleep(3000);
+        producer.send("TagA", msgSize);
+        producer.send("TagB", msgSize);
+        producer.send("TagC", msgSize);
+        Assert.assertEquals("Not all sent succeeded", msgSize * 3, producer.getAllUndupMsgBody().size());
+
+        List<String> receivedMessage = new ArrayList<>(2);
+        Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues(topic);
+        for (MessageQueue mq : mqs) {
+            SINGLE_MQ:
+            while (true) {
+                try {
+                    PullResult pullResult =
+                        consumer.pull(mq, selector, getMessageQueueOffset(mq), 32);
+                    putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
+                    switch (pullResult.getPullStatus()) {
+                        case FOUND:
+                            List<MessageExt> msgs = pullResult.getMsgFoundList();
+                            for (MessageExt msg : msgs) {
+                                receivedMessage.add(new String(msg.getBody()));
+                            }
+                            break;
+                        case NO_MATCHED_MSG:
+                            break;
+                        case NO_NEW_MSG:
+                            break SINGLE_MQ;
+                        case OFFSET_ILLEGAL:
+                            break;
+                        default:
+                            break;
+                    }
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        }
+
+        assertThat(receivedMessage.size()).isEqualTo(msgSize * 2);
+    }
+
+    private static long getMessageQueueOffset(MessageQueue mq) {
+        Long offset = OFFSE_TABLE.get(mq);
+        if (offset != null)
+            return offset;
+
+        return 0;
+    }
+
+    private static void putMessageQueueOffset(MessageQueue mq, long offset) {
+        OFFSE_TABLE.put(mq, offset);
+    }
 }