You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/12/12 20:01:11 UTC

[GitHub] [pulsar] Choqs opened a new issue, #18894: [Bug] msgInCounter and msgOutCounter don't match

Choqs opened a new issue, #18894:
URL: https://github.com/apache/pulsar/issues/18894

   ### Search before asking
   
   - [X] I searched in the [issues](https://github.com/apache/pulsar/issues) and found nothing similar.
   
   
   ### Version
   
   - System: Ubuntu 20.04
   - Image: apachepulsar/pulsar:2.10.2
   - pulsar-client==2.10.2
   
   ### Minimal reproduce step
   
   Sending 2 consecutive messages from a single producer to the same non-resident topic, where only a single consumer is listening. I wasn't able to replicate with a minimal example, outside the project I'm not able to share with you. Here is a minimal code that replicate what is in my application.
   
   Code for the consumer, to be launch first.
   ```python
   from pulsar import Client
   from pulsar.schema import String, Record, AvroSchema
   
   
   # Schema.
   class SubTestSchema(Record):
       sub_test_attribute = String(required=True)
   class TestSchema(Record):
       sub_schema = SubTestSchema(required=True)
       test_attribute = String(required=True)
   
   # Consumer.
   client = Client("pulsar://localhost:6650")
   consumer = client.subscribe(
       "non-persistent://public/default/test-topic",
       "test-consumer",
       schema=AvroSchema(TestSchema)
   )
   
   
   # Consuming loop.
   last_value = None
   while True:
   
       # Receiving message.
       message = consumer.receive()
       decoded = message.value()
       consumer.acknowledge(message)
   
       # Logging.
       print(f"=================> {decoded.test_attribute}")
   
       # If the last value is the same as the current value,
       # one of the 2 messages was not received.
       if last_value == decoded.test_attribute:
           break
   
       # Updating the last value.
       last_value = decoded.test_attribute
   
   ```
   
   
   Code of the producer, to be launched after.
   ```python
   from pulsar import Client
   from pulsar.schema import String, Record, AvroSchema
   import time
   
   
   # Schema.
   class SubTestSchema(Record):
       sub_test_attribute = String(required=True)
   class TestSchema(Record):
       sub_schema = SubTestSchema(required=True)
       test_attribute = String(required=True)
   
   
   # Producer.
   client = Client("pulsar://localhost:6650")
   producer = client.create_producer(
       "non-persistent://public/default/test-topic",
       schema=AvroSchema(TestSchema)
   )
   
   
   # Producing loop.
   for i in range(1000):
       sub_schema = SubTestSchema(
           sub_test_attribute="x" * 30000
       )
       schema_1 = TestSchema(
           sub_schema=sub_schema,
           test_attribute="message 1"
       )
       schema_2 = TestSchema(
           sub_schema=sub_schema,
           test_attribute="message 2"
       )
       producer.send_async(
           schema_1,
           callback=None
       )
       producer.send_async(
           schema_2,
           callback=None
       )
       time.sleep(0.1)
   ```
   
   ### What did you expect to see?
   
   Both message to be received by the single consumer.
   
   ### What did you see instead?
   
   In some case (once every 10 attempts), the second message is lost. The command `bin/pulsar-admin topics stats <topic_url>` shows `msgInCounter` different from `msgOutCounter`.
   ```json
   {
     "msgRateIn" : 1.3000029169898786,
     "msgThroughputIn" : 161273.26187032074,
     "msgRateOut" : 0.9833355636531143,
     "msgThroughputOut" : 121977.25999198967,
     "bytesInCounter" : 9676374,
     "msgInCounter" : 78,
     "bytesOutCounter" : 7318619,
     "msgOutCounter" : 59,
     "averageMsgSize" : 124056.07692307691,
     "msgChunkPublished" : false,
     "storageSize" : 0,
     "backlogSize" : 0,
     "publishRateLimitedTimes" : 0,
     "earliestMsgPublishTimeInBacklogs" : 0,
     "offloadedStorageSize" : 0,
     "lastOffloadLedgerId" : 0,
     "lastOffloadSuccessTimeStamp" : 0,
     "lastOffloadFailureTimeStamp" : 0,
     "waitingPublishers" : 0,
     "nonContiguousDeletedMessagesRanges" : 0,
     "nonContiguousDeletedMessagesRangesSerializedSize" : 0,
     "compaction" : {
       "lastCompactionRemovedEventCount" : 0,
       "lastCompactionSucceedTimestamp" : 0,
       "lastCompactionFailedTimestamp" : 0,
       "lastCompactionDurationTimeInMills" : 0
   
   ```
   
   ### Anything else?
   
   Adding a 100ms sleep between the 2 messages seems to resolve this issue. Below, the stats with the sleep.
   ```json
   {
     "msgRateIn" : 0.7666667280000049,
     "msgThroughputIn" : 94391.2908846366,
     "msgRateOut" : 0.766666623260558,
     "msgThroughputOut" : 94391.2779892138,
     "bytesInCounter" : 9601391,
     "msgInCounter" : 78,
     "bytesOutCounter" : 9601391,
     "msgOutCounter" : 78,
     "averageMsgSize" : 123119.06521739131,
     "msgChunkPublished" : false,
     "storageSize" : 0,
     "backlogSize" : 0,
     "publishRateLimitedTimes" : 0,
     "earliestMsgPublishTimeInBacklogs" : 0,
     "offloadedStorageSize" : 0,
     "lastOffloadLedgerId" : 0,
     "lastOffloadSuccessTimeStamp" : 0,
     "lastOffloadFailureTimeStamp" : 0,
     "waitingPublishers" : 0,
     "nonContiguousDeletedMessagesRanges" : 0,
     "nonContiguousDeletedMessagesRangesSerializedSize" : 0,
     "compaction" : {
       "lastCompactionRemovedEventCount" : 0,
       "lastCompactionSucceedTimestamp" : 0,
       "lastCompactionFailedTimestamp" : 0,
       "lastCompactionDurationTimeInMills" : 0
     },
     "msgDropRate" : 0.0,
     "publishers" : [ {
       "accessMode" : "Shared",
       "msgRateIn" : 0.0,
       "msgThroughputIn" : 0.0,
       "averageMsgSize" : 0.0,
       "chunkedMessageRate" : 0.0,
       "producerId" : 0,
       "supportsPartialProducer" : false,
       "metadata" : { },
       "msgDropRate" : 0.0,
       "address" : "/172.30.0.8:58812",
       "connectedSince" : "2022-12-12T19:32:45.159102Z",
       "clientVersion" : "2.10.2",
       "producerName" : "standalone-0-1"
     }, {
       "accessMode" : "Shared",
       "msgRateIn" : 0.7666667280000049,
       "msgThroughputIn" : 94391.2908846366,
       "averageMsgSize" : 123119.06521739131,
       "chunkedMessageRate" : 0.0,
       "producerId" : 0,
       "supportsPartialProducer" : false,
       "metadata" : { },
       "msgDropRate" : 0.0,
       "address" : "/172.30.0.8:58862",
       "connectedSince" : "2022-12-12T19:32:47.101782Z",
       "clientVersion" : "2.10.2",
       "producerName" : "standalone-0-12"
     }, {
       "accessMode" : "Shared",
       "msgRateIn" : 0.0,
       "msgThroughputIn" : 0.0,
       "averageMsgSize" : 0.0,
       "chunkedMessageRate" : 0.0,
       "producerId" : 0,
       "supportsPartialProducer" : false,
       "metadata" : { },
       "msgDropRate" : 0.0,
       "address" : "/172.30.0.8:58882",
       "connectedSince" : "2022-12-12T19:32:47.180522Z",
       "clientVersion" : "2.10.2",
       "producerName" : "standalone-0-14"
     }, {
       "accessMode" : "Shared",
       "msgRateIn" : 0.0,
       "msgThroughputIn" : 0.0,
       "averageMsgSize" : 0.0,
       "chunkedMessageRate" : 0.0,
       "producerId" : 0,
       "supportsPartialProducer" : false,
       "metadata" : { },
       "msgDropRate" : 0.0,
       "address" : "/172.30.0.8:58898",
       "connectedSince" : "2022-12-12T19:32:47.277764Z",
       "clientVersion" : "2.10.2",
       "producerName" : "standalone-0-16"
     } ],
     "subscriptions" : {
       "websocket-bced7bae" : {
         "msgRateOut" : 0.766666623260558,
         "msgThroughputOut" : 94391.2779892138,
         "bytesOutCounter" : 9601391,
         "msgOutCounter" : 78,
         "msgRateRedeliver" : 0.0,
         "messageAckRate" : 0.0,
         "chunkedMessageRate" : 0,
         "msgBacklog" : 0,
         "backlogSize" : 0,
         "earliestMsgPublishTimeInBacklog" : 0,
         "msgBacklogNoDelayed" : 0,
         "blockedSubscriptionOnUnackedMsgs" : false,
         "msgDelayed" : 0,
         "unackedMessages" : 0,
         "type" : "Exclusive",
         "msgRateExpired" : 0.0,
         "totalMsgExpired" : 0,
         "lastExpireTimestamp" : 0,
         "lastConsumedFlowTimestamp" : 0,
         "lastConsumedTimestamp" : 0,
         "lastAckedTimestamp" : 0,
         "lastMarkDeleteAdvancedTimestamp" : 0,
         "consumers" : [ {
           "msgRateOut" : 0.766666623260558,
           "msgThroughputOut" : 94391.2779892138,
           "bytesOutCounter" : 9601391,
           "msgOutCounter" : 78,
           "msgRateRedeliver" : 0.0,
           "messageAckRate" : 0.0,
           "chunkedMessageRate" : 0.0,
           "consumerName" : "f8ad791029",
           "availablePermits" : 922,
           "unackedMessages" : 0,
           "avgMessagesPerEntry" : 1,
           "blockedConsumerOnUnackedMsgs" : false,
           "lastAckedTimestamp" : 0,
           "lastConsumedTimestamp" : 1670873809511,
           "metadata" : { },
           "address" : "/172.30.0.8:58818",
           "connectedSince" : "2022-12-12T19:32:45.581123Z",
           "clientVersion" : "2.10.2"
         } ],
         "isDurable" : false,
         "isReplicated" : false,
         "allowOutOfOrderDelivery" : false,
         "consumersAfterMarkDeletePosition" : { },
         "nonContiguousDeletedMessagesRanges" : 0,
         "nonContiguousDeletedMessagesRangesSerializedSize" : 0,
         "subscriptionProperties" : { },
         "msgDropRate" : 0.0,
         "durable" : false,
         "replicated" : false
       }
     },
     "replication" : { }
   }
   
   ```
   
   ### Are you willing to submit a PR?
   
   - [ ] I'm willing to submit a PR!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] github-actions[bot] commented on issue #18894: [Bug] msgInCounter and msgOutCounter don't match

Posted by github-actions.
github-actions[bot] commented on issue #18894:
URL: https://github.com/apache/pulsar/issues/18894#issuecomment-1399147193

   The issue had no activity for 30 days, mark with Stale label.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Vincouux commented on issue #18894: [Bug] msgInCounter and msgOutCounter don't match

Posted by GitBox <gi...@apache.org>.
Vincouux commented on issue #18894:
URL: https://github.com/apache/pulsar/issues/18894#issuecomment-1361116693

   Here are the stats of the topic whose consumer does not receive all the messages when 2 messages are sent successively.
   
   ```bash
   I have no name!@d707c4bf3cd6:/pulsar$ bin/pulsar-admin topics stats non-persistent://public/default/response
   ```
   
   ```json
   {
     "msgRateIn" : 0.0,
     "msgThroughputIn" : 0.0,
     "msgRateOut" : 0.0,
     "msgThroughputOut" : 0.0,
     "bytesInCounter" : 300398,
     "msgInCounter" : 4,
     "bytesOutCounter" : 231188,
     "msgOutCounter" : 3,
     "averageMsgSize" : 0.0,
     "msgChunkPublished" : false,
     "storageSize" : 0,
     "backlogSize" : 0,
     "publishRateLimitedTimes" : 0,
     "earliestMsgPublishTimeInBacklogs" : 0,
     "offloadedStorageSize" : 0,
     "lastOffloadLedgerId" : 0,
     "lastOffloadSuccessTimeStamp" : 0,
     "lastOffloadFailureTimeStamp" : 0,
     "waitingPublishers" : 0,
     "nonContiguousDeletedMessagesRanges" : 0,
     "nonContiguousDeletedMessagesRangesSerializedSize" : 0,
     "compaction" : {
       "lastCompactionRemovedEventCount" : 0,
       "lastCompactionSucceedTimestamp" : 0,
       "lastCompactionFailedTimestamp" : 0,
       "lastCompactionDurationTimeInMills" : 0
     },
     "msgDropRate" : 0.0,
     "publishers" : [ {
       "accessMode" : "Shared",
       "msgRateIn" : 0.0,
       "msgThroughputIn" : 0.0,
       "averageMsgSize" : 0.0,
       "chunkedMessageRate" : 0.0,
       "producerId" : 0,
       "supportsPartialProducer" : false,
       "metadata" : { },
       "msgDropRate" : 0.0,
       "address" : "/172.23.0.8:51554",
       "producerName" : "standalone-0-12",
       "connectedSince" : "2022-12-21T09:58:01.724145Z",
       "clientVersion" : "2.10.2"
     }, {
       "accessMode" : "Shared",
       "msgRateIn" : 0.0,
       "msgThroughputIn" : 0.0,
       "averageMsgSize" : 0.0,
       "chunkedMessageRate" : 0.0,
       "producerId" : 0,
       "supportsPartialProducer" : false,
       "metadata" : { },
       "msgDropRate" : 0.0,
       "address" : "/172.23.0.8:51588",
       "producerName" : "standalone-0-13",
       "connectedSince" : "2022-12-21T09:58:01.742844Z",
       "clientVersion" : "2.10.2"
     }, {
       "accessMode" : "Shared",
       "msgRateIn" : 0.0,
       "msgThroughputIn" : 0.0,
       "averageMsgSize" : 0.0,
       "chunkedMessageRate" : 0.0,
       "producerId" : 0,
       "supportsPartialProducer" : false,
       "metadata" : { },
       "msgDropRate" : 0.0,
       "address" : "/172.23.0.8:51590",
       "producerName" : "standalone-0-14",
       "connectedSince" : "2022-12-21T09:58:01.740524Z",
       "clientVersion" : "2.10.2"
     }, {
       "accessMode" : "Shared",
       "msgRateIn" : 0.0,
       "msgThroughputIn" : 0.0,
       "averageMsgSize" : 0.0,
       "chunkedMessageRate" : 0.0,
       "producerId" : 0,
       "supportsPartialProducer" : false,
       "metadata" : { },
       "msgDropRate" : 0.0,
       "address" : "/172.23.0.8:51530",
       "producerName" : "standalone-0-2",
       "connectedSince" : "2022-12-21T09:58:00.107789Z",
       "clientVersion" : "2.10.2"
     } ],
     "subscriptions" : {
       "websocket-a2fb2c1d" : {
         "msgRateOut" : 0.0,
         "msgThroughputOut" : 0.0,
         "bytesOutCounter" : 231188,
         "msgOutCounter" : 3,
         "msgRateRedeliver" : 0.0,
         "messageAckRate" : 0.0,
         "chunkedMessageRate" : 0,
         "msgBacklog" : 0,
         "backlogSize" : 0,
         "earliestMsgPublishTimeInBacklog" : 0,
         "msgBacklogNoDelayed" : 0,
         "blockedSubscriptionOnUnackedMsgs" : false,
         "msgDelayed" : 0,
         "unackedMessages" : 0,
         "type" : "Exclusive",
         "msgRateExpired" : 0.0,
         "totalMsgExpired" : 0,
         "lastExpireTimestamp" : 0,
         "lastConsumedFlowTimestamp" : 0,
         "lastConsumedTimestamp" : 0,
         "lastAckedTimestamp" : 0,
         "lastMarkDeleteAdvancedTimestamp" : 0,
         "consumers" : [ {
           "msgRateOut" : 0.0,
           "msgThroughputOut" : 0.0,
           "bytesOutCounter" : 231188,
           "msgOutCounter" : 3,
           "msgRateRedeliver" : 0.0,
           "messageAckRate" : 0.0,
           "chunkedMessageRate" : 0.0,
           "consumerName" : "99734a93b8",
           "availablePermits" : 997,
           "unackedMessages" : 0,
           "avgMessagesPerEntry" : 1,
           "blockedConsumerOnUnackedMsgs" : false,
           "lastAckedTimestamp" : 0,
           "lastConsumedTimestamp" : 1671616733404,
           "metadata" : { },
           "address" : "/172.23.0.8:51508",
           "connectedSince" : "2022-12-21T09:58:00.396967Z",
           "clientVersion" : "2.10.2"
         } ],
         "isDurable" : false,
         "isReplicated" : false,
         "allowOutOfOrderDelivery" : false,
         "consumersAfterMarkDeletePosition" : { },
         "nonContiguousDeletedMessagesRanges" : 0,
         "nonContiguousDeletedMessagesRangesSerializedSize" : 0,
         "subscriptionProperties" : { },
         "msgDropRate" : 0.0,
         "durable" : false,
         "replicated" : false
       }
     },
     "replication" : { }
   }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] tjiuming commented on issue #18894: [Bug] msgInCounter and msgOutCounter don't match

Posted by GitBox <gi...@apache.org>.
tjiuming commented on issue #18894:
URL: https://github.com/apache/pulsar/issues/18894#issuecomment-1359950123

   could you please provide the full stats outputs when msgInCounter != msgOutCounter?
   the above stats output is not completely


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Vincouux commented on issue #18894: [Bug] msgInCounter and msgOutCounter don't match

Posted by GitBox <gi...@apache.org>.
Vincouux commented on issue #18894:
URL: https://github.com/apache/pulsar/issues/18894#issuecomment-1359109152

   Thanks for your answer @tjiuming , but it doesn't look like the expected behaviour. I use to see thousand of messages being transferred across many different topics. From my experience, the only reason for a message not to be delivered to a consumer would be the available permits being 0. But here it's not the case, send only 2 messages can result in the behaviour stated above. I regret I was not able to reproduce, but maybe you can think of different reasons for a message not to be delivered.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] tjiuming commented on issue #18894: [Bug] msgInCounter and msgOutCounter don't match

Posted by GitBox <gi...@apache.org>.
tjiuming commented on issue #18894:
URL: https://github.com/apache/pulsar/issues/18894#issuecomment-1359026529

   NonPersistentTopic cannot guarantee that message doesn't loss, so message lost may happens
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org