You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2019/05/28 08:55:48 UTC

[GitHub] [pulsar] KannarFr opened a new issue #4391: pulsar consumption ack strange behavior

KannarFr opened a new issue #4391: pulsar consumption ack strange behavior
URL: https://github.com/apache/pulsar/issues/4391
 
 
   **Describe the bug**
   I create Exclusive consumer on persistent topic. I consume all the messages using pulsar4s (the scala library), which sends an ack directly after received the message with its ConsumerMessage class.
   
   Anyway, it uses the java pulsar client, and it produce this log: ```10:43:45.806 [pulsar-timer-4-1] INFO  o.a.p.c.i.ConsumerStatsRecorderImpl - [persistent://clevercloud/logs/communications-paths] [towarp10] [18114] Prefetched messages: 0 --- Consume throughput received: 53.68 msgs/s --- 0.10 Mbit/s --- Ack sent rate: 68.68 ack/s --- Failed messages: 0 --- Failed acks: 0```.
   
   So all messages of the topic are consumed, because my consumer is waiting for new input at this moment. So all seems good.
   
   But, if I rerun my program (my consumer), all these messages are here again. I don't understand why.
   
   Here are the debug logs:
   
   ```10:53:08.265 [warp10-akka.actor.default-dispatcher-6] DEBUG o.a.pulsar.client.impl.ConsumerImpl - [towarp10] [1e334] acknowledging message - 642:4099:-1:0, acktype Individual
   10:53:08.265 [warp10-akka.actor.default-dispatcher-7] DEBUG c.s.p.a.s.PulsarSourceGraphStage - Pull received; asking consumer for message
   10:53:08.267 [pulsar-client-io-1-1] DEBUG o.a.p.c.i.PersistentAcknowledgmentsGroupingTracker - [ConsumerBase{subscription='towarp10', consumerName='1e334', topic='persistent://clevercloud/logs/communications-paths'}] Flushing pending acks to broker: last-cumulative-ack: -1:-1:-1 -- individual-acks: [642:4002:-1:0, 642:4003:-1:0, 642:4004:-1:0, 642:4005:-1:0, 642:4006:-1:0, 642:4007:-1:0, 642:4008:-1:0, 642:4009:-1:0, 642:4010:-1:0, 642:4011:-1:0, 642:4012:-1:0, 642:4013:-1:0, 642:4014:-1:0, 642:4015:-1:0, 642:4016:-1:0, 642:4017:-1:0, 642:4018:-1:0, 642:4019:-1:0, 642:4020:-1:0, 642:4021:-1:0, 642:4022:-1:0, 642:4023:-1:0, 642:4024:-1:0, 642:4025:-1:0, 642:4026:-1:0, 642:4027:-1:0, 642:4028:-1:0, 642:4029:-1:0, 642:4030:-1:0, 642:4031:-1:0, 642:4032:-1:0, 642:4033:-1:0, 642:4034:-1:0, 642:4035:-1:0, 642:4036:-1:0, 642:4037:-1:0, 642:4038:-1:0, 642:4039:-1:0, 642:4040:-1:0, 642:4041:-1:0, 642:4042:-1:0, 642:4043:-1:0, 642:4044:-1:0, 642:4045:-1:0, 642:4046:-1:0, 642:4047:-1:0, 642:4048:-1:0, 642:4049:-1:0, 642:4050:-1:0, 642:4051:-1:0, 642:4052:-1:0, 642:4053:-1:0, 642:4054:-1:0, 642:4055:-1:0, 642:4056:-1:0, 642:4057:-1:0, 642:4058:-1:0, 642:4059:-1:0, 642:4060:-1:0, 642:4061:-1:0, 642:4062:-1:0, 642:4063:-1:0, 642:4064:-1:0, 642:4065:-1:0, 642:4066:-1:0, 642:4067:-1:0, 642:4068:-1:0, 642:4069:-1:0, 642:4070:-1:0, 642:4071:-1:0, 642:4072:-1:0, 642:4073:-1:0, 642:4074:-1:0, 642:4075:-1:0, 642:4076:-1:0, 642:4077:-1:0, 642:4078:-1:0, 642:4079:-1:0, 642:4080:-1:0, 642:4081:-1:0, 642:4082:-1:0, 642:4083:-1:0, 642:4084:-1:0, 642:4085:-1:0, 642:4086:-1:0, 642:4087:-1:0, 642:4088:-1:0, 642:4089:-1:0, 642:4090:-1:0, 642:4091:-1:0, 642:4092:-1:0, 642:4093:-1:0, 642:4094:-1:0, 642:4095:-1:0, 642:4096:-1:0, 642:4097:-1:0, 642:4098:-1:0, 642:4099:-1:0]
   10:53:08.281 [warp10-akka.actor.default-dispatcher-6] DEBUG c.s.p.a.s.PulsarSourceGraphStage - Msg received DefaultConsumerMessage(Some(),{"s":{"ip":"43.34.34.34","lt":3.453252,"lg":2.2134331,"ct":"Rouen","co":"France"},"d":{"ip":"143.134.134.134","lt":13.453252,"lg":12.2134331,"ct":"Paris","co":"France"},"vb":"GET","path":"/","scheme":"http","a":"app_123456342141","o":"orga_33"}
   ,[B@25d58f05,Map(),Pulsar4sMessageIdImpl(642:4100:-1:0),SequenceId(979),ProducerName(c1-42-140),PublishTime(1558619780340),EventTime(0),Topic(persistent://clevercloud/logs/communications-paths))
   10:53:08.282 [warp10-akka.actor.default-dispatcher-6] DEBUG o.a.pulsar.client.impl.ConsumerImpl - [towarp10] [1e334] can ack message to broker 642:4100:-1:0, acktype Individual, cardinality 0, length 1
   10:53:08.282 [warp10-akka.actor.default-dispatcher-6] DEBUG o.a.pulsar.client.impl.ConsumerImpl - [towarp10] [1e334] acknowledging message - 642:4100:-1:0, acktype Individual
   10:53:08.282 [warp10-akka.actor.default-dispatcher-7] DEBUG c.s.p.a.s.PulsarSourceGraphStage - Pull received; asking consumer for message
   10:53:08.282 [warp10-akka.actor.default-dispatcher-7] DEBUG c.s.p.a.s.PulsarSourceGraphStage - Msg received DefaultConsumerMessage(Some(),{"s":{"ip":"43.34.34.34","lt":3.453252,"lg":2.2134331,"ct":"Rouen","co":"France"},"d":{"ip":"143.134.134.134","lt":13.453252,"lg":12.2134331,"ct":"Paris","co":"France"},"vb":"GET","path":"/","scheme":"http","a":"app_123456342141","o":"orga_33"}
   ,[B@3a2729c,Map(),Pulsar4sMessageIdImpl(642:4101:-1:0),SequenceId(980),ProducerName(c1-42-140),PublishTime(1558619780346),EventTime(0),Topic(persistent://clevercloud/logs/communications-paths))
   10:53:08.282 [warp10-akka.actor.default-dispatcher-7] DEBUG o.a.pulsar.client.impl.ConsumerImpl - [towarp10] [1e334] can ack message to broker 642:4101:-1:0, acktype Individual, cardinality 0, length 1```
   
   If you need more informations, do not hesitate to ask me.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services