You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2022/07/15 09:34:11 UTC
[rocketmq] branch develop updated: [ISSUE #4606] Build trace msgs for DefaultLitePullConsumer when poll is called
This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 00da3e724 [ISSUE #4606] Build trace msgs for DefaultLitePullConsumer when poll is called
00da3e724 is described below
commit 00da3e7249d2339717d96f7ea2ab8e4a2fd4242c
Author: cserwen <cs...@163.com>
AuthorDate: Fri Jul 15 17:33:54 2022 +0800
[ISSUE #4606] Build trace msgs for DefaultLitePullConsumer when poll is called
Co-authored-by: dengzhiwen1 <de...@xiaomi.com>
---
.../impl/consumer/DefaultLitePullConsumerImpl.java | 24 +++++++++++-----------
1 file changed, 12 insertions(+), 12 deletions(-)
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
index 58dfd8eda..9b300a87e 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
@@ -568,6 +568,18 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
assignedMessageQueue.updateConsumeOffset(consumeRequest.getMessageQueue(), offset);
//If namespace not null , reset Topic without namespace.
this.resetTopic(messages);
+ if (!this.consumeMessageHookList.isEmpty()) {
+ ConsumeMessageContext consumeMessageContext = new ConsumeMessageContext();
+ consumeMessageContext.setNamespace(defaultLitePullConsumer.getNamespace());
+ consumeMessageContext.setConsumerGroup(this.groupName());
+ consumeMessageContext.setMq(consumeRequest.getMessageQueue());
+ consumeMessageContext.setMsgList(messages);
+ consumeMessageContext.setSuccess(false);
+ this.executeHookBefore(consumeMessageContext);
+ consumeMessageContext.setStatus(ConsumeConcurrentlyStatus.CONSUME_SUCCESS.toString());
+ consumeMessageContext.setSuccess(true);
+ this.executeHookAfter(consumeMessageContext);
+ }
return messages;
}
} catch (InterruptedException ignore) {
@@ -949,18 +961,6 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
null
);
this.pullAPIWrapper.processPullResult(mq, pullResult, subscriptionData);
- if (!this.consumeMessageHookList.isEmpty()) {
- ConsumeMessageContext consumeMessageContext = new ConsumeMessageContext();
- consumeMessageContext.setNamespace(defaultLitePullConsumer.getNamespace());
- consumeMessageContext.setConsumerGroup(this.groupName());
- consumeMessageContext.setMq(mq);
- consumeMessageContext.setMsgList(pullResult.getMsgFoundList());
- consumeMessageContext.setSuccess(false);
- this.executeHookBefore(consumeMessageContext);
- consumeMessageContext.setStatus(ConsumeConcurrentlyStatus.CONSUME_SUCCESS.toString());
- consumeMessageContext.setSuccess(true);
- this.executeHookAfter(consumeMessageContext);
- }
return pullResult;
}