You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2018/11/27 02:27:45 UTC

[GitHub] vongosling closed pull request #521: [ISSUE#502]Add new feature : pull consumer support sql92 filter.

vongosling closed pull request #521: [ISSUE#502]Add new feature : pull consumer support sql92 filter.
URL: https://github.com/apache/rocketmq/pull/521
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java
index cd7067030..6befbf3b5 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java
@@ -257,6 +257,18 @@ public PullResult pull(MessageQueue mq, String subExpression, long offset, int m
         return this.defaultMQPullConsumerImpl.pull(mq, subExpression, offset, maxNums, timeout);
     }
 
+    @Override
+    public PullResult pull(MessageQueue mq, MessageSelector messageSelector, long offset, int maxNums)
+        throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+        return this.defaultMQPullConsumerImpl.pull(mq, messageSelector, offset, maxNums);
+    }
+
+    @Override
+    public PullResult pull(MessageQueue mq, MessageSelector messageSelector, long offset, int maxNums, long timeout)
+        throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+        return this.defaultMQPullConsumerImpl.pull(mq, messageSelector, offset, maxNums, timeout);
+    }
+
     @Override
     public void pull(MessageQueue mq, String subExpression, long offset, int maxNums, PullCallback pullCallback)
         throws MQClientException, RemotingException, InterruptedException {
@@ -270,6 +282,20 @@ public void pull(MessageQueue mq, String subExpression, long offset, int maxNums
         this.defaultMQPullConsumerImpl.pull(mq, subExpression, offset, maxNums, pullCallback, timeout);
     }
 
+    @Override
+    public void pull(MessageQueue mq, MessageSelector messageSelector, long offset, int maxNums,
+        PullCallback pullCallback)
+        throws MQClientException, RemotingException, InterruptedException {
+        this.defaultMQPullConsumerImpl.pull(mq, messageSelector, offset, maxNums, pullCallback);
+    }
+
+    @Override
+    public void pull(MessageQueue mq, MessageSelector messageSelector, long offset, int maxNums,
+        PullCallback pullCallback, long timeout)
+        throws MQClientException, RemotingException, InterruptedException {
+        this.defaultMQPullConsumerImpl.pull(mq, messageSelector, offset, maxNums, pullCallback, timeout);
+    }
+
     @Override
     public PullResult pullBlockIfNotFound(MessageQueue mq, String subExpression, long offset, int maxNums)
         throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumer.java
index 33002c983..28b807c2e 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumer.java
@@ -66,6 +66,39 @@ PullResult pull(final MessageQueue mq, final String subExpression, final long of
         final int maxNums, final long timeout) throws MQClientException, RemotingException,
         MQBrokerException, InterruptedException;
 
+    /**
+     * Pulling the messages, not blocking
+     * <p>
+     * support other message selection, such as {@link org.apache.rocketmq.common.filter.ExpressionType#SQL92}
+     * </p>
+     *
+     * @param mq from which message queue
+     * @param selector message selector({@link MessageSelector}), can be null.
+     * @param offset from where to pull
+     * @param maxNums max pulling numbers
+     * @return The resulting {@code PullRequest}
+     */
+    PullResult pull(final MessageQueue mq, final MessageSelector selector, final long offset,
+        final int maxNums) throws MQClientException, RemotingException, MQBrokerException,
+        InterruptedException;
+
+    /**
+     * Pulling the messages in the specified timeout
+     * <p>
+     * support other message selection, such as {@link org.apache.rocketmq.common.filter.ExpressionType#SQL92}
+     * </p>
+     *
+     * @param mq from which message queue
+     * @param selector message selector({@link MessageSelector}), can be null.
+     * @param offset from where to pull
+     * @param maxNums max pulling numbers
+     * @param timeout Pulling the messages in the specified timeout
+     * @return The resulting {@code PullRequest}
+     */
+    PullResult pull(final MessageQueue mq, final MessageSelector selector, final long offset,
+        final int maxNums, final long timeout) throws MQClientException, RemotingException, MQBrokerException,
+        InterruptedException;
+
     /**
      * Pulling the messages in a async. way
      */
@@ -80,6 +113,20 @@ void pull(final MessageQueue mq, final String subExpression, final long offset,
         final PullCallback pullCallback, long timeout) throws MQClientException, RemotingException,
         InterruptedException;
 
+    /**
+     * Pulling the messages in a async. way. Support message selection
+     */
+    void pull(final MessageQueue mq, final MessageSelector selector, final long offset, final int maxNums,
+        final PullCallback pullCallback) throws MQClientException, RemotingException,
+        InterruptedException;
+
+    /**
+     * Pulling the messages in a async. way. Support message selection
+     */
+    void pull(final MessageQueue mq, final MessageSelector selector, final long offset, final int maxNums,
+        final PullCallback pullCallback, long timeout) throws MQClientException, RemotingException,
+        InterruptedException;
+
     /**
      * Pulling the messages,if no message arrival,blocking some time
      *
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
index 420d89b2f..39c43d592 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
@@ -26,6 +26,7 @@
 import org.apache.rocketmq.client.QueryResult;
 import org.apache.rocketmq.client.Validators;
 import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
+import org.apache.rocketmq.client.consumer.MessageSelector;
 import org.apache.rocketmq.client.consumer.PullCallback;
 import org.apache.rocketmq.client.consumer.PullResult;
 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
@@ -46,6 +47,7 @@
 import org.apache.rocketmq.common.ServiceState;
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
+import org.apache.rocketmq.common.filter.ExpressionType;
 import org.apache.rocketmq.common.filter.FilterAPI;
 import org.apache.rocketmq.common.help.FAQUrl;
 import org.apache.rocketmq.logging.InternalLogger;
@@ -158,17 +160,58 @@ public PullResult pull(MessageQueue mq, String subExpression, long offset, int m
 
     public PullResult pull(MessageQueue mq, String subExpression, long offset, int maxNums, long timeout)
         throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
-        return this.pullSyncImpl(mq, subExpression, offset, maxNums, false, timeout);
+        SubscriptionData subscriptionData = getSubscriptionData(mq, subExpression);
+        return this.pullSyncImpl(mq, subscriptionData, offset, maxNums, false, timeout);
     }
 
-    private PullResult pullSyncImpl(MessageQueue mq, String subExpression, long offset, int maxNums, boolean block,
+    public PullResult pull(MessageQueue mq, MessageSelector messageSelector, long offset, int maxNums)
+        throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+        return pull(mq, messageSelector, offset, maxNums, this.defaultMQPullConsumer.getConsumerPullTimeoutMillis());
+    }
+
+    public PullResult pull(MessageQueue mq, MessageSelector messageSelector, long offset, int maxNums, long timeout)
+        throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+        SubscriptionData subscriptionData = getSubscriptionData(mq, messageSelector);
+        return this.pullSyncImpl(mq, subscriptionData, offset, maxNums, false, timeout);
+    }
+
+    private SubscriptionData getSubscriptionData(MessageQueue mq, String subExpression)
+        throws MQClientException {
+
+        if (null == mq) {
+            throw new MQClientException("mq is null", null);
+        }
+
+        try {
+            return FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(),
+                mq.getTopic(), subExpression);
+        } catch (Exception e) {
+            throw new MQClientException("parse subscription error", e);
+        }
+    }
+
+    private SubscriptionData getSubscriptionData(MessageQueue mq, MessageSelector messageSelector)
+        throws MQClientException {
+
+        if (null == mq) {
+            throw new MQClientException("mq is null", null);
+        }
+
+        try {
+            return FilterAPI.build(mq.getTopic(),
+                messageSelector.getExpression(), messageSelector.getExpressionType());
+        } catch (Exception e) {
+            throw new MQClientException("parse subscription error", e);
+        }
+    }
+
+    private PullResult pullSyncImpl(MessageQueue mq, SubscriptionData subscriptionData, long offset, int maxNums, boolean block,
         long timeout)
         throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
         this.makeSureStateOK();
 
         if (null == mq) {
             throw new MQClientException("mq is null", null);
-
         }
 
         if (offset < 0) {
@@ -183,20 +226,14 @@ private PullResult pullSyncImpl(MessageQueue mq, String subExpression, long offs
 
         int sysFlag = PullSysFlag.buildSysFlag(false, block, true, false);
 
-        SubscriptionData subscriptionData;
-        try {
-            subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(),
-                mq.getTopic(), subExpression);
-        } catch (Exception e) {
-            throw new MQClientException("parse subscription error", e);
-        }
-
         long timeoutMillis = block ? this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() : timeout;
 
+        boolean isTagType = ExpressionType.isTagType(subscriptionData.getExpressionType());
         PullResult pullResult = this.pullAPIWrapper.pullKernelImpl(
             mq,
             subscriptionData.getSubString(),
-            0L,
+            subscriptionData.getExpressionType(),
+            isTagType ? 0L : subscriptionData.getSubVersion(),
             offset,
             maxNums,
             sysFlag,
@@ -369,12 +406,27 @@ public void pull(MessageQueue mq, String subExpression, long offset, int maxNums
     public void pull(MessageQueue mq, String subExpression, long offset, int maxNums, PullCallback pullCallback,
         long timeout)
         throws MQClientException, RemotingException, InterruptedException {
-        this.pullAsyncImpl(mq, subExpression, offset, maxNums, pullCallback, false, timeout);
+        SubscriptionData subscriptionData = getSubscriptionData(mq, subExpression);
+        this.pullAsyncImpl(mq, subscriptionData, offset, maxNums, pullCallback, false, timeout);
+    }
+
+    public void pull(MessageQueue mq, MessageSelector messageSelector, long offset, int maxNums,
+        PullCallback pullCallback)
+        throws MQClientException, RemotingException, InterruptedException {
+        pull(mq, messageSelector, offset, maxNums, pullCallback, this.defaultMQPullConsumer.getConsumerPullTimeoutMillis());
+    }
+
+    public void pull(MessageQueue mq, MessageSelector messageSelector, long offset, int maxNums,
+        PullCallback pullCallback,
+        long timeout)
+        throws MQClientException, RemotingException, InterruptedException {
+        SubscriptionData subscriptionData = getSubscriptionData(mq, messageSelector);
+        this.pullAsyncImpl(mq, subscriptionData, offset, maxNums, pullCallback, false, timeout);
     }
 
     private void pullAsyncImpl(
         final MessageQueue mq,
-        final String subExpression,
+        final SubscriptionData subscriptionData,
         final long offset,
         final int maxNums,
         final PullCallback pullCallback,
@@ -403,20 +455,14 @@ private void pullAsyncImpl(
         try {
             int sysFlag = PullSysFlag.buildSysFlag(false, block, true, false);
 
-            final SubscriptionData subscriptionData;
-            try {
-                subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(),
-                    mq.getTopic(), subExpression);
-            } catch (Exception e) {
-                throw new MQClientException("parse subscription error", e);
-            }
-
             long timeoutMillis = block ? this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() : timeout;
 
+            boolean isTagType = ExpressionType.isTagType(subscriptionData.getExpressionType());
             this.pullAPIWrapper.pullKernelImpl(
                 mq,
                 subscriptionData.getSubString(),
-                0L,
+                subscriptionData.getExpressionType(),
+                isTagType ? 0L : subscriptionData.getSubVersion(),
                 offset,
                 maxNums,
                 sysFlag,
@@ -444,7 +490,8 @@ public void onException(Throwable e) {
 
     public PullResult pullBlockIfNotFound(MessageQueue mq, String subExpression, long offset, int maxNums)
         throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
-        return this.pullSyncImpl(mq, subExpression, offset, maxNums, true, this.getDefaultMQPullConsumer().getConsumerPullTimeoutMillis());
+        SubscriptionData subscriptionData = getSubscriptionData(mq, subExpression);
+        return this.pullSyncImpl(mq, subscriptionData, offset, maxNums, true, this.getDefaultMQPullConsumer().getConsumerPullTimeoutMillis());
     }
 
     public DefaultMQPullConsumer getDefaultMQPullConsumer() {
@@ -454,7 +501,8 @@ public DefaultMQPullConsumer getDefaultMQPullConsumer() {
     public void pullBlockIfNotFound(MessageQueue mq, String subExpression, long offset, int maxNums,
         PullCallback pullCallback)
         throws MQClientException, RemotingException, InterruptedException {
-        this.pullAsyncImpl(mq, subExpression, offset, maxNums, pullCallback, true,
+        SubscriptionData subscriptionData = getSubscriptionData(mq, subExpression);
+        this.pullAsyncImpl(mq, subscriptionData, offset, maxNums, pullCallback, true,
             this.getDefaultMQPullConsumer().getConsumerPullTimeoutMillis());
     }
 
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java
index b650e35e0..1d2d24fa3 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java
@@ -209,34 +209,6 @@ public PullResult pullKernelImpl(
         throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
     }
 
-    public PullResult pullKernelImpl(
-        final MessageQueue mq,
-        final String subExpression,
-        final long subVersion,
-        final long offset,
-        final int maxNums,
-        final int sysFlag,
-        final long commitOffset,
-        final long brokerSuspendMaxTimeMillis,
-        final long timeoutMillis,
-        final CommunicationMode communicationMode,
-        final PullCallback pullCallback
-    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
-        return pullKernelImpl(
-            mq,
-            subExpression,
-            ExpressionType.TAG,
-            subVersion, offset,
-            maxNums,
-            sysFlag,
-            commitOffset,
-            brokerSuspendMaxTimeMillis,
-            timeoutMillis,
-            communicationMode,
-            pullCallback
-        );
-    }
-
     public long recalculatePullFromWhichNode(final MessageQueue mq) {
         if (this.isConnectBrokerByUser()) {
             return this.defaultBrokerId;
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
index 1b3885629..2a48dcece 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
@@ -1042,6 +1042,19 @@ public int findBrokerVersion(String brokerName, String brokerAddr) {
             if (this.brokerVersionTable.get(brokerName).containsKey(brokerAddr)) {
                 return this.brokerVersionTable.get(brokerName).get(brokerAddr);
             }
+        } else {
+            HeartbeatData heartbeatData = prepareHeartbeatData();
+            try {
+                int version = this.mQClientAPIImpl.sendHearbeat(brokerAddr, heartbeatData, 3000);
+                return version;
+            } catch (Exception e) {
+                if (this.isBrokerInNameServer(brokerAddr)) {
+                    log.info("send heart beat to broker[{} {}] failed", brokerName, brokerAddr);
+                } else {
+                    log.info("send heart beat to broker[{} {}] exception, because the broker not up, forget it", brokerName,
+                        brokerAddr);
+                }
+            }
         }
         return 0;
     }
diff --git a/test/src/test/java/org/apache/rocketmq/test/client/consumer/filter/SqlFilterIT.java b/test/src/test/java/org/apache/rocketmq/test/client/consumer/filter/SqlFilterIT.java
index 79f15dcc9..a0f6555ce 100644
--- a/test/src/test/java/org/apache/rocketmq/test/client/consumer/filter/SqlFilterIT.java
+++ b/test/src/test/java/org/apache/rocketmq/test/client/consumer/filter/SqlFilterIT.java
@@ -17,12 +17,18 @@
 
 package org.apache.rocketmq.test.client.consumer.filter;
 
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import org.apache.log4j.Logger;
+import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
 import org.apache.rocketmq.client.consumer.MessageSelector;
+import org.apache.rocketmq.client.consumer.PullResult;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.test.base.BaseConf;
-import org.apache.rocketmq.test.client.consumer.broadcast.BaseBroadCastIT;
-import org.apache.rocketmq.test.client.consumer.broadcast.normal.NormalMsgTwoSameGroupConsumerIT;
-import org.apache.rocketmq.test.client.rmq.RMQBroadCastConsumer;
 import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
 import org.apache.rocketmq.test.client.rmq.RMQSqlConsumer;
 import org.apache.rocketmq.test.factory.ConsumerFactory;
@@ -39,12 +45,14 @@
     private static Logger logger = Logger.getLogger(SqlFilterIT.class);
     private RMQNormalProducer producer = null;
     private String topic = null;
+    private static final Map<MessageQueue, Long> OFFSE_TABLE = new HashMap<MessageQueue, Long>();
 
     @Before
     public void setUp() {
         topic = initTopic();
         logger.info(String.format("use topic: %s;", topic));
         producer = getProducer(nsAddr, topic);
+        OFFSE_TABLE.clear();
     }
 
     @After
@@ -71,4 +79,65 @@ public void testFilterConsumer() throws Exception {
 
         assertThat(consumer.getListener().getAllMsgBody().size()).isEqualTo(msgSize * 2);
     }
+
+    @Test
+    public void testFilterPullConsumer() throws Exception {
+        int msgSize = 16;
+
+        String group = initConsumerGroup();
+        MessageSelector selector = MessageSelector.bySql("(TAGS is not null and TAGS in ('TagA', 'TagB'))");
+        DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(group);
+        consumer.setNamesrvAddr(nsAddr);
+        consumer.start();
+        Thread.sleep(3000);
+        producer.send("TagA", msgSize);
+        producer.send("TagB", msgSize);
+        producer.send("TagC", msgSize);
+        Assert.assertEquals("Not all sent succeeded", msgSize * 3, producer.getAllUndupMsgBody().size());
+
+        List<String> receivedMessage = new ArrayList<>(2);
+        Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues(topic);
+        for (MessageQueue mq : mqs) {
+            SINGLE_MQ:
+            while (true) {
+                try {
+                    PullResult pullResult =
+                        consumer.pull(mq, selector, getMessageQueueOffset(mq), 32);
+                    putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
+                    switch (pullResult.getPullStatus()) {
+                        case FOUND:
+                            List<MessageExt> msgs = pullResult.getMsgFoundList();
+                            for (MessageExt msg : msgs) {
+                                receivedMessage.add(new String(msg.getBody()));
+                            }
+                            break;
+                        case NO_MATCHED_MSG:
+                            break;
+                        case NO_NEW_MSG:
+                            break SINGLE_MQ;
+                        case OFFSET_ILLEGAL:
+                            break;
+                        default:
+                            break;
+                    }
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        }
+
+        assertThat(receivedMessage.size()).isEqualTo(msgSize * 2);
+    }
+
+    private static long getMessageQueueOffset(MessageQueue mq) {
+        Long offset = OFFSE_TABLE.get(mq);
+        if (offset != null)
+            return offset;
+
+        return 0;
+    }
+
+    private static void putMessageQueueOffset(MessageQueue mq, long offset) {
+        OFFSE_TABLE.put(mq, offset);
+    }
 }
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
index bcd66669c..2a7815b61 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
@@ -842,7 +842,7 @@ public boolean consumed(final MessageExt msg,
                 BrokerData brokerData = ci.getBrokerAddrTable().get(mq.getBrokerName());
                 if (brokerData != null) {
                     String addr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
-                    if (addr.equals(RemotingUtil.socketAddress2String(msg.getStoreHost()))) {
+                    if (RemotingUtil.socketAddress2String(msg.getStoreHost()).equals(addr)) {
                         if (next.getValue().getConsumerOffset() > msg.getQueueOffset()) {
                             return true;
                         }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services