You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2021/04/23 10:02:35 UTC
[GitHub] [rocketmq-client-go] yaphetsglhf opened a new issue #650: Orderly Consumer will not reconsume the message
yaphetsglhf opened a new issue #650:
URL: https://github.com/apache/rocketmq-client-go/issues/650
we try to use the orderly consumer, but when we return the `SuspendCurrentQueueAMoment` , we found the message will not be reconsumed again.
Another problem is that when we send another message which commit `ConsumeSuccess`, all the messages including the blocked ones will commited all at once, which will lead to message missing.
how to init the consumer :
```
c.consumer, err = rocketmq.NewPushConsumer(
consumer.WithGroupName(c.GroupName),
consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{config.ENV.MQConnectionString})),
consumer.WithConsumerModel(consumer.Clustering),
consumer.WithConsumeFromWhere(consumer.ConsumeFromFirstOffset),
consumer.WithConsumerOrder(true),
consumer.WithMaxReconsumeTimes(5),
)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [rocketmq-client-go] qianlongzt commented on issue #650: Orderly Consumer will not reconsume the message
Posted by GitBox <gi...@apache.org>.
qianlongzt commented on issue #650:
URL: https://github.com/apache/rocketmq-client-go/issues/650#issuecomment-832581056
```go
type processQueue struct {
cachedMsgCount int64
cachedMsgSize int64
msgCache *treemap.Map
consumingMsgOrderlyTreeMap *treemap.Map
}
```
i think `cachedMsgCount` mean pulled msgCount ,now equal `msgCache.Size() + consumingMsgOrderlyTreeMap.Size()`, used by https://github.com/apache/rocketmq-client-go/blob/dfa26d159f9f47cf38c8788dc3ae9443bad61e0d/consumer/push_consumer.go#L583
first problem is #615 ,if consuming orderly and failed, the cachedMsgCount will increase and can't consume any more.
I send a pr to fix #616 ,but got another problem( #618 msg lost when failed).
so pr #619 make cachedMsgCount and msg both ok
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [rocketmq-client-go] qianlongzt removed a comment on issue #650: Orderly Consumer will not reconsume the message
Posted by GitBox <gi...@apache.org>.
qianlongzt removed a comment on issue #650:
URL: https://github.com/apache/rocketmq-client-go/issues/650#issuecomment-832671089
for now, prombem takeMessages
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [rocketmq-client-go] yaphetsglhf commented on issue #650: Orderly Consumer will not reconsume the message
Posted by GitBox <gi...@apache.org>.
yaphetsglhf commented on issue #650:
URL: https://github.com/apache/rocketmq-client-go/issues/650#issuecomment-832443515
> > may be rel here bug #618
>
> yep, I check the code, but i still have a question. may `pq.msgCache.Put(msg.QueueOffset, msg)` dismatch `cachedMsgCount`, as `cachedMsgCount` means `msgCache.Size()`
If message not in `msgCache`, but in `consumingMsgOrderlyTreeMap`, then message will put back to msgCache, but the cachedMsgCount will not increase.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [rocketmq-client-go] yaphetsglhf commented on issue #650: Orderly Consumer will not reconsume the message
Posted by GitBox <gi...@apache.org>.
yaphetsglhf commented on issue #650:
URL: https://github.com/apache/rocketmq-client-go/issues/650#issuecomment-832654742
> ```go
> type processQueue struct {
> cachedMsgCount int64
> cachedMsgSize int64
>
> msgCache *treemap.Map
> consumingMsgOrderlyTreeMap *treemap.Map
> }
> ```
>
> i think `cachedMsgCount` mean pulled msgCount ,now equal `msgCache.Size() + consumingMsgOrderlyTreeMap.Size()`, used by
>
> https://github.com/apache/rocketmq-client-go/blob/dfa26d159f9f47cf38c8788dc3ae9443bad61e0d/consumer/push_consumer.go#L583
>
> first problem is #615 ,if consuming orderly and failed, the cachedMsgCount will increase and can't consume any more.
> I send a pr to fix #616 ,but got another problem( #618 msg lost when failed).
> so pr #619 make cachedMsgCount and msg both ok
as you said, `cachedMsgCount = msgCache.Size() + consumingMsgOrderlyTreeMap.Size()`, if you put one message back to `msgCache`, the cachedMsgCount should plus one at the same time. Or else, you just mean the total amount of the `cachedMsgCount` should not change, but the message will in both `msgCache` and `consumingMsgOrderlyTreeMap`, so it's not necessary to increate the `cachedMsgCount`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [rocketmq-client-go] yaphetsglhf commented on issue #650: Orderly Consumer will not reconsume the message
Posted by GitBox <gi...@apache.org>.
yaphetsglhf commented on issue #650:
URL: https://github.com/apache/rocketmq-client-go/issues/650#issuecomment-832442765
> may be rel here bug #618
yep, I check the code, but i still have a question. may `pq.msgCache.Put(msg.QueueOffset, msg)` dismatch `cachedMsgCount`, as `cachedMsgCount` means `msgCache.Size()`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [rocketmq-client-go] yaphetsglhf commented on issue #650: Orderly Consumer will not reconsume the message
Posted by GitBox <gi...@apache.org>.
yaphetsglhf commented on issue #650:
URL: https://github.com/apache/rocketmq-client-go/issues/650#issuecomment-826103908
well, I found that If I consumer message failed and return `SuspendCurrentQueueAMoment`, then next time, when I pull messages again, I will only get the latest messages. but if i restart the container, I will receive messages from the earliest offset, then I will get the former failed messges.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [rocketmq-client-go] maixiaohai closed issue #650: Orderly Consumer will not reconsume the message
Posted by GitBox <gi...@apache.org>.
maixiaohai closed issue #650:
URL: https://github.com/apache/rocketmq-client-go/issues/650
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [rocketmq-client-go] qianlongzt commented on issue #650: Orderly Consumer will not reconsume the message
Posted by GitBox <gi...@apache.org>.
qianlongzt commented on issue #650:
URL: https://github.com/apache/rocketmq-client-go/issues/650#issuecomment-832671089
for now, prombem takeMessages
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [rocketmq-client-go] qianlongzt commented on issue #650: Orderly Consumer will not reconsume the message
Posted by GitBox <gi...@apache.org>.
qianlongzt commented on issue #650:
URL: https://github.com/apache/rocketmq-client-go/issues/650#issuecomment-832677194
I can't say problem is `takeMessages`, java code have `makeMessageToConsumeAgain` do this work
https://github.com/apache/rocketmq/blob/b4240d5cea8d001c21b9c0d73f5aa700fcd0d568/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java#L283-L297
i think problem is `consumeMessageCurrently` https://github.com/apache/rocketmq/blob/master/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java#L282-L288
https://github.com/apache/rocketmq-client-go/blob/master/consumer/push_consumer.go#L1158-L1163
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [rocketmq-client-go] yaphetsglhf commented on issue #650: Orderly Consumer will not reconsume the message
Posted by GitBox <gi...@apache.org>.
yaphetsglhf commented on issue #650:
URL: https://github.com/apache/rocketmq-client-go/issues/650#issuecomment-826103908
well, I found that If I consumer message failed and return `SuspendCurrentQueueAMoment`, then next time, when I pull messages again, I will only get the latest messages. but if i restart the container, I will receive messages from the earliest offset, then I will get the former failed messges.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [rocketmq-client-go] qianlongzt commented on issue #650: Orderly Consumer will not reconsume the message
Posted by GitBox <gi...@apache.org>.
qianlongzt commented on issue #650:
URL: https://github.com/apache/rocketmq-client-go/issues/650#issuecomment-829846451
may be rel here bug https://github.com/apache/rocketmq-client-go/issues/618
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [rocketmq-client-go] yaphetsglhf commented on issue #650: Orderly Consumer will not reconsume the message
Posted by GitBox <gi...@apache.org>.
yaphetsglhf commented on issue #650:
URL: https://github.com/apache/rocketmq-client-go/issues/650#issuecomment-832657556
I review the code, I think the problem is the funciton `takeMessages` can only get the new messages with `pq.msgCache.Min()`, and the new taken messages will commit successfully, leading to the message missing. an i right?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org