You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2021/04/23 10:02:35 UTC

[GitHub] [rocketmq-client-go] yaphetsglhf opened a new issue #650: Orderly Consumer will not reconsume the message

yaphetsglhf opened a new issue #650:
URL: https://github.com/apache/rocketmq-client-go/issues/650


   we try to use the orderly consumer, but when we return the `SuspendCurrentQueueAMoment` , we found the message will not be reconsumed again.
   
   Another problem is that when we send another message which commit `ConsumeSuccess`,  all the messages including the blocked ones will commited all at once, which will lead to message missing.
   
   how to init the consumer :
   ```
   	c.consumer, err = rocketmq.NewPushConsumer(
   		consumer.WithGroupName(c.GroupName),
   		consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{config.ENV.MQConnectionString})),
   		consumer.WithConsumerModel(consumer.Clustering),
   		consumer.WithConsumeFromWhere(consumer.ConsumeFromFirstOffset),
   		consumer.WithConsumerOrder(true),
   		consumer.WithMaxReconsumeTimes(5),
   	)
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq-client-go] qianlongzt commented on issue #650: Orderly Consumer will not reconsume the message

Posted by GitBox <gi...@apache.org>.
qianlongzt commented on issue #650:
URL: https://github.com/apache/rocketmq-client-go/issues/650#issuecomment-832581056


   ```go
   type processQueue struct {
   	cachedMsgCount             int64
   	cachedMsgSize              int64
   
   	msgCache                   *treemap.Map
   	consumingMsgOrderlyTreeMap *treemap.Map
   }
   ```
   i think `cachedMsgCount`  mean pulled msgCount ,now  equal `msgCache.Size() + consumingMsgOrderlyTreeMap.Size()`, used by https://github.com/apache/rocketmq-client-go/blob/dfa26d159f9f47cf38c8788dc3ae9443bad61e0d/consumer/push_consumer.go#L583 
   
   first problem is #615 ,if consuming orderly and failed, the cachedMsgCount  will increase and can't consume any more.
   I send a pr to fix #616 ,but got another problem( #618   msg lost when failed).
   so pr #619 make cachedMsgCount and msg both ok
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq-client-go] qianlongzt removed a comment on issue #650: Orderly Consumer will not reconsume the message

Posted by GitBox <gi...@apache.org>.
qianlongzt removed a comment on issue #650:
URL: https://github.com/apache/rocketmq-client-go/issues/650#issuecomment-832671089


   for now, prombem takeMessages
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq-client-go] yaphetsglhf commented on issue #650: Orderly Consumer will not reconsume the message

Posted by GitBox <gi...@apache.org>.
yaphetsglhf commented on issue #650:
URL: https://github.com/apache/rocketmq-client-go/issues/650#issuecomment-832443515


   > > may be rel here bug #618
   > 
   > yep, I check the code, but i still have a question. may `pq.msgCache.Put(msg.QueueOffset, msg)` dismatch `cachedMsgCount`, as `cachedMsgCount` means `msgCache.Size()`
   
   If message not in `msgCache`, but in `consumingMsgOrderlyTreeMap`, then message will put back to msgCache, but the cachedMsgCount will not increase.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq-client-go] yaphetsglhf commented on issue #650: Orderly Consumer will not reconsume the message

Posted by GitBox <gi...@apache.org>.
yaphetsglhf commented on issue #650:
URL: https://github.com/apache/rocketmq-client-go/issues/650#issuecomment-832654742


   > ```go
   > type processQueue struct {
   > 	cachedMsgCount             int64
   > 	cachedMsgSize              int64
   > 
   > 	msgCache                   *treemap.Map
   > 	consumingMsgOrderlyTreeMap *treemap.Map
   > }
   > ```
   > 
   > i think `cachedMsgCount` mean pulled msgCount ,now equal `msgCache.Size() + consumingMsgOrderlyTreeMap.Size()`, used by
   > 
   > https://github.com/apache/rocketmq-client-go/blob/dfa26d159f9f47cf38c8788dc3ae9443bad61e0d/consumer/push_consumer.go#L583
   > 
   > first problem is #615 ,if consuming orderly and failed, the cachedMsgCount will increase and can't consume any more.
   > I send a pr to fix #616 ,but got another problem( #618 msg lost when failed).
   > so pr #619 make cachedMsgCount and msg both ok
   
   as you said, `cachedMsgCount  = msgCache.Size() + consumingMsgOrderlyTreeMap.Size()`, if you put one message back to `msgCache`, the cachedMsgCount should plus one at the same time.  Or else, you just mean the total amount of the `cachedMsgCount` should not change, but the message will in both `msgCache` and `consumingMsgOrderlyTreeMap`, so it's not necessary to increate the `cachedMsgCount`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq-client-go] yaphetsglhf commented on issue #650: Orderly Consumer will not reconsume the message

Posted by GitBox <gi...@apache.org>.
yaphetsglhf commented on issue #650:
URL: https://github.com/apache/rocketmq-client-go/issues/650#issuecomment-832442765


   > may be rel here bug #618
   
   yep, I check the code, but i still have a question. may `pq.msgCache.Put(msg.QueueOffset, msg)`  dismatch `cachedMsgCount`, as `cachedMsgCount` means `msgCache.Size()`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq-client-go] yaphetsglhf commented on issue #650: Orderly Consumer will not reconsume the message

Posted by GitBox <gi...@apache.org>.
yaphetsglhf commented on issue #650:
URL: https://github.com/apache/rocketmq-client-go/issues/650#issuecomment-826103908


   well, I found that If I consumer message failed and return `SuspendCurrentQueueAMoment`, then next time, when I pull messages again, I will only get the latest messages. but if i restart the container, I will receive messages from the earliest offset, then I will get the former failed messges.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq-client-go] maixiaohai closed issue #650: Orderly Consumer will not reconsume the message

Posted by GitBox <gi...@apache.org>.
maixiaohai closed issue #650:
URL: https://github.com/apache/rocketmq-client-go/issues/650


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq-client-go] qianlongzt commented on issue #650: Orderly Consumer will not reconsume the message

Posted by GitBox <gi...@apache.org>.
qianlongzt commented on issue #650:
URL: https://github.com/apache/rocketmq-client-go/issues/650#issuecomment-832671089


   for now, prombem takeMessages
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq-client-go] qianlongzt commented on issue #650: Orderly Consumer will not reconsume the message

Posted by GitBox <gi...@apache.org>.
qianlongzt commented on issue #650:
URL: https://github.com/apache/rocketmq-client-go/issues/650#issuecomment-832677194


   I can't say problem is `takeMessages`, java code have `makeMessageToConsumeAgain` do this work
   
   https://github.com/apache/rocketmq/blob/b4240d5cea8d001c21b9c0d73f5aa700fcd0d568/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java#L283-L297
   
   i think problem is `consumeMessageCurrently` https://github.com/apache/rocketmq/blob/master/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java#L282-L288
   
   https://github.com/apache/rocketmq-client-go/blob/master/consumer/push_consumer.go#L1158-L1163


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq-client-go] yaphetsglhf commented on issue #650: Orderly Consumer will not reconsume the message

Posted by GitBox <gi...@apache.org>.
yaphetsglhf commented on issue #650:
URL: https://github.com/apache/rocketmq-client-go/issues/650#issuecomment-826103908


   well, I found that If I consumer message failed and return `SuspendCurrentQueueAMoment`, then next time, when I pull messages again, I will only get the latest messages. but if i restart the container, I will receive messages from the earliest offset, then I will get the former failed messges.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq-client-go] qianlongzt commented on issue #650: Orderly Consumer will not reconsume the message

Posted by GitBox <gi...@apache.org>.
qianlongzt commented on issue #650:
URL: https://github.com/apache/rocketmq-client-go/issues/650#issuecomment-829846451


   may be rel here bug https://github.com/apache/rocketmq-client-go/issues/618 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq-client-go] yaphetsglhf commented on issue #650: Orderly Consumer will not reconsume the message

Posted by GitBox <gi...@apache.org>.
yaphetsglhf commented on issue #650:
URL: https://github.com/apache/rocketmq-client-go/issues/650#issuecomment-832657556


   I review the code, I think the problem is the funciton `takeMessages` can only get the new messages with `pq.msgCache.Min()`, and the new taken messages will commit successfully, leading to the message missing. an i right? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org