You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@eventmesh.apache.org by GitBox <gi...@apache.org> on 2021/06/14 14:35:40 UTC

[GitHub] [incubator-eventmesh] jinrongluo commented on issue #386: ConsumerGroup Queue Offset is not synced up correctly after PullConsumer has consumer all the messages in the topic

jinrongluo commented on issue #386:
URL: https://github.com/apache/incubator-eventmesh/issues/386#issuecomment-860735664


   Root Cause Analysis:
   
   After studying EventMesh code and debug logs, I found out a racing condition and class variable sharing issue in eventmesh-connector-rocketmq module.
   
    In `org.apache.eventmesh.connector.rocketmq.consumer.PushConsumerImpl.java` Every new message being consumed, it will set the message's context into `PushConsumerImpl.context` variable
   
   https://github.com/apache/incubator-eventmesh/blob/develop/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/consumer/PushConsumerImpl.java#L134-L136
   
   This context is pass through different layers of consumer handlers, and finally used in `org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService.java`  to update the Queue Consumer Offset back to RocketMQ
   https://github.com/apache/incubator-eventmesh/blob/develop/eventmesh-connector-rocketmq/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java#L352-L364
   
   However, I found that `PushConsumerImpl.context` variable is being shared by multiple thread in EventMeshMessageListenerConcurrently object when registered into DefaultMQPushConsumer.
   
   So every new message will update PushConsumerImpl.context of itself, and the old message's context is overridden. So the old message is not able to update its Queue Consumer Offset using its own context.
   
   We should not let all messages to share the same PushConsumerImpl.context class variable. We should have each message to maintain its own context and passing through the consumer handlers, without being modified.
   
   I will propose a fix based on this analysis.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: dev-help@eventmesh.apache.org