You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2022/07/15 09:34:11 UTC

[rocketmq] branch develop updated: [ISSUE #4606] Build trace msgs for DefaultLitePullConsumer when poll is called

This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 00da3e724 [ISSUE #4606] Build trace msgs for DefaultLitePullConsumer when poll is called
00da3e724 is described below

commit 00da3e7249d2339717d96f7ea2ab8e4a2fd4242c
Author: cserwen <cs...@163.com>
AuthorDate: Fri Jul 15 17:33:54 2022 +0800

    [ISSUE #4606] Build trace msgs for DefaultLitePullConsumer when poll is called
    
    Co-authored-by: dengzhiwen1 <de...@xiaomi.com>
---
 .../impl/consumer/DefaultLitePullConsumerImpl.java | 24 +++++++++++-----------
 1 file changed, 12 insertions(+), 12 deletions(-)

diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
index 58dfd8eda..9b300a87e 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
@@ -568,6 +568,18 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
                 assignedMessageQueue.updateConsumeOffset(consumeRequest.getMessageQueue(), offset);
                 //If namespace not null , reset Topic without namespace.
                 this.resetTopic(messages);
+                if (!this.consumeMessageHookList.isEmpty()) {
+                    ConsumeMessageContext consumeMessageContext = new ConsumeMessageContext();
+                    consumeMessageContext.setNamespace(defaultLitePullConsumer.getNamespace());
+                    consumeMessageContext.setConsumerGroup(this.groupName());
+                    consumeMessageContext.setMq(consumeRequest.getMessageQueue());
+                    consumeMessageContext.setMsgList(messages);
+                    consumeMessageContext.setSuccess(false);
+                    this.executeHookBefore(consumeMessageContext);
+                    consumeMessageContext.setStatus(ConsumeConcurrentlyStatus.CONSUME_SUCCESS.toString());
+                    consumeMessageContext.setSuccess(true);
+                    this.executeHookAfter(consumeMessageContext);
+                }
                 return messages;
             }
         } catch (InterruptedException ignore) {
@@ -949,18 +961,6 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
             null
         );
         this.pullAPIWrapper.processPullResult(mq, pullResult, subscriptionData);
-        if (!this.consumeMessageHookList.isEmpty()) {
-            ConsumeMessageContext consumeMessageContext = new ConsumeMessageContext();
-            consumeMessageContext.setNamespace(defaultLitePullConsumer.getNamespace());
-            consumeMessageContext.setConsumerGroup(this.groupName());
-            consumeMessageContext.setMq(mq);
-            consumeMessageContext.setMsgList(pullResult.getMsgFoundList());
-            consumeMessageContext.setSuccess(false);
-            this.executeHookBefore(consumeMessageContext);
-            consumeMessageContext.setStatus(ConsumeConcurrentlyStatus.CONSUME_SUCCESS.toString());
-            consumeMessageContext.setSuccess(true);
-            this.executeHookAfter(consumeMessageContext);
-        }
         return pullResult;
     }