You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2018/11/27 12:13:34 UTC

[GitHub] walking98 closed pull request #564: Develop

walking98 closed pull request #564: Develop
URL: https://github.com/apache/rocketmq/pull/564
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
index b7e7a6187..c696614ee 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
@@ -356,6 +356,12 @@ private RemotingCommand sendMessage(final ChannelHandlerContext ctx,
                         + "] sending transaction message is forbidden");
                 return response;
             }
+            if (msgInner.getDelayTimeLevel() > 0) {
+                response.setCode(ResponseCode.NO_PERMISSION);
+                response.setRemark(
+                    "delay message is not supported when sending transaction message");
+                return response;
+            }
             putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner);
         } else {
             putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java
index cd7067030..6befbf3b5 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java
@@ -257,6 +257,18 @@ public PullResult pull(MessageQueue mq, String subExpression, long offset, int m
         return this.defaultMQPullConsumerImpl.pull(mq, subExpression, offset, maxNums, timeout);
     }
 
+    @Override
+    public PullResult pull(MessageQueue mq, MessageSelector messageSelector, long offset, int maxNums)
+        throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+        return this.defaultMQPullConsumerImpl.pull(mq, messageSelector, offset, maxNums);
+    }
+
+    @Override
+    public PullResult pull(MessageQueue mq, MessageSelector messageSelector, long offset, int maxNums, long timeout)
+        throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+        return this.defaultMQPullConsumerImpl.pull(mq, messageSelector, offset, maxNums, timeout);
+    }
+
     @Override
     public void pull(MessageQueue mq, String subExpression, long offset, int maxNums, PullCallback pullCallback)
         throws MQClientException, RemotingException, InterruptedException {
@@ -270,6 +282,20 @@ public void pull(MessageQueue mq, String subExpression, long offset, int maxNums
         this.defaultMQPullConsumerImpl.pull(mq, subExpression, offset, maxNums, pullCallback, timeout);
     }
 
+    @Override
+    public void pull(MessageQueue mq, MessageSelector messageSelector, long offset, int maxNums,
+        PullCallback pullCallback)
+        throws MQClientException, RemotingException, InterruptedException {
+        this.defaultMQPullConsumerImpl.pull(mq, messageSelector, offset, maxNums, pullCallback);
+    }
+
+    @Override
+    public void pull(MessageQueue mq, MessageSelector messageSelector, long offset, int maxNums,
+        PullCallback pullCallback, long timeout)
+        throws MQClientException, RemotingException, InterruptedException {
+        this.defaultMQPullConsumerImpl.pull(mq, messageSelector, offset, maxNums, pullCallback, timeout);
+    }
+
     @Override
     public PullResult pullBlockIfNotFound(MessageQueue mq, String subExpression, long offset, int maxNums)
         throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumer.java
index 33002c983..28b807c2e 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumer.java
@@ -66,6 +66,39 @@ PullResult pull(final MessageQueue mq, final String subExpression, final long of
         final int maxNums, final long timeout) throws MQClientException, RemotingException,
         MQBrokerException, InterruptedException;
 
+    /**
+     * Pulling the messages, not blocking
+     * <p>
+     * support other message selection, such as {@link org.apache.rocketmq.common.filter.ExpressionType#SQL92}
+     * </p>
+     *
+     * @param mq from which message queue
+     * @param selector message selector({@link MessageSelector}), can be null.
+     * @param offset from where to pull
+     * @param maxNums max pulling numbers
+     * @return The resulting {@code PullRequest}
+     */
+    PullResult pull(final MessageQueue mq, final MessageSelector selector, final long offset,
+        final int maxNums) throws MQClientException, RemotingException, MQBrokerException,
+        InterruptedException;
+
+    /**
+     * Pulling the messages in the specified timeout
+     * <p>
+     * support other message selection, such as {@link org.apache.rocketmq.common.filter.ExpressionType#SQL92}
+     * </p>
+     *
+     * @param mq from which message queue
+     * @param selector message selector({@link MessageSelector}), can be null.
+     * @param offset from where to pull
+     * @param maxNums max pulling numbers
+     * @param timeout Pulling the messages in the specified timeout
+     * @return The resulting {@code PullRequest}
+     */
+    PullResult pull(final MessageQueue mq, final MessageSelector selector, final long offset,
+        final int maxNums, final long timeout) throws MQClientException, RemotingException, MQBrokerException,
+        InterruptedException;
+
     /**
      * Pulling the messages in a async. way
      */
@@ -80,6 +113,20 @@ void pull(final MessageQueue mq, final String subExpression, final long offset,
         final PullCallback pullCallback, long timeout) throws MQClientException, RemotingException,
         InterruptedException;
 
+    /**
+     * Pulling the messages in a async. way. Support message selection
+     */
+    void pull(final MessageQueue mq, final MessageSelector selector, final long offset, final int maxNums,
+        final PullCallback pullCallback) throws MQClientException, RemotingException,
+        InterruptedException;
+
+    /**
+     * Pulling the messages in a async. way. Support message selection
+     */
+    void pull(final MessageQueue mq, final MessageSelector selector, final long offset, final int maxNums,
+        final PullCallback pullCallback, long timeout) throws MQClientException, RemotingException,
+        InterruptedException;
+
     /**
      * Pulling the messages,if no message arrival,blocking some time
      *
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
index 420d89b2f..39c43d592 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
@@ -26,6 +26,7 @@
 import org.apache.rocketmq.client.QueryResult;
 import org.apache.rocketmq.client.Validators;
 import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
+import org.apache.rocketmq.client.consumer.MessageSelector;
 import org.apache.rocketmq.client.consumer.PullCallback;
 import org.apache.rocketmq.client.consumer.PullResult;
 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
@@ -46,6 +47,7 @@
 import org.apache.rocketmq.common.ServiceState;
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
+import org.apache.rocketmq.common.filter.ExpressionType;
 import org.apache.rocketmq.common.filter.FilterAPI;
 import org.apache.rocketmq.common.help.FAQUrl;
 import org.apache.rocketmq.logging.InternalLogger;
@@ -158,17 +160,58 @@ public PullResult pull(MessageQueue mq, String subExpression, long offset, int m
 
     public PullResult pull(MessageQueue mq, String subExpression, long offset, int maxNums, long timeout)
         throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
-        return this.pullSyncImpl(mq, subExpression, offset, maxNums, false, timeout);
+        SubscriptionData subscriptionData = getSubscriptionData(mq, subExpression);
+        return this.pullSyncImpl(mq, subscriptionData, offset, maxNums, false, timeout);
     }
 
-    private PullResult pullSyncImpl(MessageQueue mq, String subExpression, long offset, int maxNums, boolean block,
+    public PullResult pull(MessageQueue mq, MessageSelector messageSelector, long offset, int maxNums)
+        throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+        return pull(mq, messageSelector, offset, maxNums, this.defaultMQPullConsumer.getConsumerPullTimeoutMillis());
+    }
+
+    public PullResult pull(MessageQueue mq, MessageSelector messageSelector, long offset, int maxNums, long timeout)
+        throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+        SubscriptionData subscriptionData = getSubscriptionData(mq, messageSelector);
+        return this.pullSyncImpl(mq, subscriptionData, offset, maxNums, false, timeout);
+    }
+
+    private SubscriptionData getSubscriptionData(MessageQueue mq, String subExpression)
+        throws MQClientException {
+
+        if (null == mq) {
+            throw new MQClientException("mq is null", null);
+        }
+
+        try {
+            return FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(),
+                mq.getTopic(), subExpression);
+        } catch (Exception e) {
+            throw new MQClientException("parse subscription error", e);
+        }
+    }
+
+    private SubscriptionData getSubscriptionData(MessageQueue mq, MessageSelector messageSelector)
+        throws MQClientException {
+
+        if (null == mq) {
+            throw new MQClientException("mq is null", null);
+        }
+
+        try {
+            return FilterAPI.build(mq.getTopic(),
+                messageSelector.getExpression(), messageSelector.getExpressionType());
+        } catch (Exception e) {
+            throw new MQClientException("parse subscription error", e);
+        }
+    }
+
+    private PullResult pullSyncImpl(MessageQueue mq, SubscriptionData subscriptionData, long offset, int maxNums, boolean block,
         long timeout)
         throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
         this.makeSureStateOK();
 
         if (null == mq) {
             throw new MQClientException("mq is null", null);
-
         }
 
         if (offset < 0) {
@@ -183,20 +226,14 @@ private PullResult pullSyncImpl(MessageQueue mq, String subExpression, long offs
 
         int sysFlag = PullSysFlag.buildSysFlag(false, block, true, false);
 
-        SubscriptionData subscriptionData;
-        try {
-            subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(),
-                mq.getTopic(), subExpression);
-        } catch (Exception e) {
-            throw new MQClientException("parse subscription error", e);
-        }
-
         long timeoutMillis = block ? this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() : timeout;
 
+        boolean isTagType = ExpressionType.isTagType(subscriptionData.getExpressionType());
         PullResult pullResult = this.pullAPIWrapper.pullKernelImpl(
             mq,
             subscriptionData.getSubString(),
-            0L,
+            subscriptionData.getExpressionType(),
+            isTagType ? 0L : subscriptionData.getSubVersion(),
             offset,
             maxNums,
             sysFlag,
@@ -369,12 +406,27 @@ public void pull(MessageQueue mq, String subExpression, long offset, int maxNums
     public void pull(MessageQueue mq, String subExpression, long offset, int maxNums, PullCallback pullCallback,
         long timeout)
         throws MQClientException, RemotingException, InterruptedException {
-        this.pullAsyncImpl(mq, subExpression, offset, maxNums, pullCallback, false, timeout);
+        SubscriptionData subscriptionData = getSubscriptionData(mq, subExpression);
+        this.pullAsyncImpl(mq, subscriptionData, offset, maxNums, pullCallback, false, timeout);
+    }
+
+    public void pull(MessageQueue mq, MessageSelector messageSelector, long offset, int maxNums,
+        PullCallback pullCallback)
+        throws MQClientException, RemotingException, InterruptedException {
+        pull(mq, messageSelector, offset, maxNums, pullCallback, this.defaultMQPullConsumer.getConsumerPullTimeoutMillis());
+    }
+
+    public void pull(MessageQueue mq, MessageSelector messageSelector, long offset, int maxNums,
+        PullCallback pullCallback,
+        long timeout)
+        throws MQClientException, RemotingException, InterruptedException {
+        SubscriptionData subscriptionData = getSubscriptionData(mq, messageSelector);
+        this.pullAsyncImpl(mq, subscriptionData, offset, maxNums, pullCallback, false, timeout);
     }
 
     private void pullAsyncImpl(
         final MessageQueue mq,
-        final String subExpression,
+        final SubscriptionData subscriptionData,
         final long offset,
         final int maxNums,
         final PullCallback pullCallback,
@@ -403,20 +455,14 @@ private void pullAsyncImpl(
         try {
             int sysFlag = PullSysFlag.buildSysFlag(false, block, true, false);
 
-            final SubscriptionData subscriptionData;
-            try {
-                subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(),
-                    mq.getTopic(), subExpression);
-            } catch (Exception e) {
-                throw new MQClientException("parse subscription error", e);
-            }
-
             long timeoutMillis = block ? this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() : timeout;
 
+            boolean isTagType = ExpressionType.isTagType(subscriptionData.getExpressionType());
             this.pullAPIWrapper.pullKernelImpl(
                 mq,
                 subscriptionData.getSubString(),
-                0L,
+                subscriptionData.getExpressionType(),
+                isTagType ? 0L : subscriptionData.getSubVersion(),
                 offset,
                 maxNums,
                 sysFlag,
@@ -444,7 +490,8 @@ public void onException(Throwable e) {
 
     public PullResult pullBlockIfNotFound(MessageQueue mq, String subExpression, long offset, int maxNums)
         throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
-        return this.pullSyncImpl(mq, subExpression, offset, maxNums, true, this.getDefaultMQPullConsumer().getConsumerPullTimeoutMillis());
+        SubscriptionData subscriptionData = getSubscriptionData(mq, subExpression);
+        return this.pullSyncImpl(mq, subscriptionData, offset, maxNums, true, this.getDefaultMQPullConsumer().getConsumerPullTimeoutMillis());
     }
 
     public DefaultMQPullConsumer getDefaultMQPullConsumer() {
@@ -454,7 +501,8 @@ public DefaultMQPullConsumer getDefaultMQPullConsumer() {
     public void pullBlockIfNotFound(MessageQueue mq, String subExpression, long offset, int maxNums,
         PullCallback pullCallback)
         throws MQClientException, RemotingException, InterruptedException {
-        this.pullAsyncImpl(mq, subExpression, offset, maxNums, pullCallback, true,
+        SubscriptionData subscriptionData = getSubscriptionData(mq, subExpression);
+        this.pullAsyncImpl(mq, subscriptionData, offset, maxNums, pullCallback, true,
             this.getDefaultMQPullConsumer().getConsumerPullTimeoutMillis());
     }
 
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java
index b650e35e0..1d2d24fa3 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java
@@ -209,34 +209,6 @@ public PullResult pullKernelImpl(
         throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
     }
 
-    public PullResult pullKernelImpl(
-        final MessageQueue mq,
-        final String subExpression,
-        final long subVersion,
-        final long offset,
-        final int maxNums,
-        final int sysFlag,
-        final long commitOffset,
-        final long brokerSuspendMaxTimeMillis,
-        final long timeoutMillis,
-        final CommunicationMode communicationMode,
-        final PullCallback pullCallback
-    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
-        return pullKernelImpl(
-            mq,
-            subExpression,
-            ExpressionType.TAG,
-            subVersion, offset,
-            maxNums,
-            sysFlag,
-            commitOffset,
-            brokerSuspendMaxTimeMillis,
-            timeoutMillis,
-            communicationMode,
-            pullCallback
-        );
-    }
-
     public long recalculatePullFromWhichNode(final MessageQueue mq) {
         if (this.isConnectBrokerByUser()) {
             return this.defaultBrokerId;
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
index 9ffaed0a4..80347d105 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
@@ -1046,6 +1046,19 @@ public int findBrokerVersion(String brokerName, String brokerAddr) {
             if (this.brokerVersionTable.get(brokerName).containsKey(brokerAddr)) {
                 return this.brokerVersionTable.get(brokerName).get(brokerAddr);
             }
+        } else {
+            HeartbeatData heartbeatData = prepareHeartbeatData();
+            try {
+                int version = this.mQClientAPIImpl.sendHearbeat(brokerAddr, heartbeatData, 3000);
+                return version;
+            } catch (Exception e) {
+                if (this.isBrokerInNameServer(brokerAddr)) {
+                    log.info("send heart beat to broker[{} {}] failed", brokerName, brokerAddr);
+                } else {
+                    log.info("send heart beat to broker[{} {}] exception, because the broker not up, forget it", brokerName,
+                        brokerAddr);
+                }
+            }
         }
         return 0;
     }
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
index 7ace9d5b0..5c0d7c312 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
@@ -1127,6 +1127,9 @@ public TransactionSendResult sendMessageInTransaction(final Message msg,
             throw new MQClientException("tranExecutor is null", null);
         }
         Validators.checkMessage(msg, this.defaultMQProducer);
+        if (msg.getDelayTimeLevel() > 0) {
+            throw new UnsupportedOperationException("TimeDelayLevel is not supported for transactional message");
+        }
 
         SendResult sendResult = null;
         MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
diff --git a/pom.xml b/pom.xml
index 0a8fef875..6bc3a2bd5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -259,6 +259,11 @@
                         <exclude>src/test/resources/META-INF/service/*</exclude>
                         <exclude>*/target/**</exclude>
                         <exclude>*/*.iml</exclude>
+                        <exclude>**/*.md</exclude>
+                        <exclude>**/*.yml</exclude>
+                        <exclude>**/*.key</exclude>
+                        <exclude>**/service/org.apache.rocketmq.broker.transaction.AbstractTransactionalMessageCheckListener</exclude>
+                        <exclude>**/service/org.apache.rocketmq.broker.transaction.TransactionalMessageService</exclude>
                     </excludes>
                 </configuration>
             </plugin>
diff --git a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
index 02aa84a3e..8d60321ed 100644
--- a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
+++ b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
@@ -604,7 +604,7 @@ public void setDefaultQueryMaxNum(int defaultQueryMaxNum) {
     }
 
     /**
-     * Enable transient commitLog store poll only if transientStorePoolEnable is true and the FlushDiskType is
+     * Enable transient commitLog store pool only if transientStorePoolEnable is true and the FlushDiskType is
      * ASYNC_FLUSH
      *
      * @return <tt>true</tt> or <tt>false</tt>
diff --git a/test/src/test/java/org/apache/rocketmq/test/client/consumer/filter/SqlFilterIT.java b/test/src/test/java/org/apache/rocketmq/test/client/consumer/filter/SqlFilterIT.java
index 79f15dcc9..a0f6555ce 100644
--- a/test/src/test/java/org/apache/rocketmq/test/client/consumer/filter/SqlFilterIT.java
+++ b/test/src/test/java/org/apache/rocketmq/test/client/consumer/filter/SqlFilterIT.java
@@ -17,12 +17,18 @@
 
 package org.apache.rocketmq.test.client.consumer.filter;
 
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import org.apache.log4j.Logger;
+import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
 import org.apache.rocketmq.client.consumer.MessageSelector;
+import org.apache.rocketmq.client.consumer.PullResult;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.test.base.BaseConf;
-import org.apache.rocketmq.test.client.consumer.broadcast.BaseBroadCastIT;
-import org.apache.rocketmq.test.client.consumer.broadcast.normal.NormalMsgTwoSameGroupConsumerIT;
-import org.apache.rocketmq.test.client.rmq.RMQBroadCastConsumer;
 import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
 import org.apache.rocketmq.test.client.rmq.RMQSqlConsumer;
 import org.apache.rocketmq.test.factory.ConsumerFactory;
@@ -39,12 +45,14 @@
     private static Logger logger = Logger.getLogger(SqlFilterIT.class);
     private RMQNormalProducer producer = null;
     private String topic = null;
+    private static final Map<MessageQueue, Long> OFFSE_TABLE = new HashMap<MessageQueue, Long>();
 
     @Before
     public void setUp() {
         topic = initTopic();
         logger.info(String.format("use topic: %s;", topic));
         producer = getProducer(nsAddr, topic);
+        OFFSE_TABLE.clear();
     }
 
     @After
@@ -71,4 +79,65 @@ public void testFilterConsumer() throws Exception {
 
         assertThat(consumer.getListener().getAllMsgBody().size()).isEqualTo(msgSize * 2);
     }
+
+    @Test
+    public void testFilterPullConsumer() throws Exception {
+        int msgSize = 16;
+
+        String group = initConsumerGroup();
+        MessageSelector selector = MessageSelector.bySql("(TAGS is not null and TAGS in ('TagA', 'TagB'))");
+        DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(group);
+        consumer.setNamesrvAddr(nsAddr);
+        consumer.start();
+        Thread.sleep(3000);
+        producer.send("TagA", msgSize);
+        producer.send("TagB", msgSize);
+        producer.send("TagC", msgSize);
+        Assert.assertEquals("Not all sent succeeded", msgSize * 3, producer.getAllUndupMsgBody().size());
+
+        List<String> receivedMessage = new ArrayList<>(2);
+        Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues(topic);
+        for (MessageQueue mq : mqs) {
+            SINGLE_MQ:
+            while (true) {
+                try {
+                    PullResult pullResult =
+                        consumer.pull(mq, selector, getMessageQueueOffset(mq), 32);
+                    putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
+                    switch (pullResult.getPullStatus()) {
+                        case FOUND:
+                            List<MessageExt> msgs = pullResult.getMsgFoundList();
+                            for (MessageExt msg : msgs) {
+                                receivedMessage.add(new String(msg.getBody()));
+                            }
+                            break;
+                        case NO_MATCHED_MSG:
+                            break;
+                        case NO_NEW_MSG:
+                            break SINGLE_MQ;
+                        case OFFSET_ILLEGAL:
+                            break;
+                        default:
+                            break;
+                    }
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        }
+
+        assertThat(receivedMessage.size()).isEqualTo(msgSize * 2);
+    }
+
+    private static long getMessageQueueOffset(MessageQueue mq) {
+        Long offset = OFFSE_TABLE.get(mq);
+        if (offset != null)
+            return offset;
+
+        return 0;
+    }
+
+    private static void putMessageQueueOffset(MessageQueue mq, long offset) {
+        OFFSE_TABLE.put(mq, offset);
+    }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services