You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/01/08 14:24:30 UTC

[GitHub] [pulsar-client-go] bschofield opened a new issue #437: SendAsync() can stall with large BatchingMaxPublishDelay

bschofield opened a new issue #437:
URL: https://github.com/apache/pulsar-client-go/issues/437


   #### Observed behavior
   
   When using `SendAsync()` together with a large `BatchingMaxPublishDelay`, such that batch flushing is driven mainly by `BatchingMaxMessages`, send stalls can occur.
   
   I don't fully understand the cause of these, but increasing `MaxPendingMessages` seems to make them go away.
   
   It may be relevant that I am producing to a topic with several partitions.
   
   #### Steps to reproduce
   
   Create a producer with a large `BatchingMaxPublishDelay` and the other values default, e.g.
   
       pulsar.ProducerOptions{
         Topic:                   topic,
         CompressionType:         pulsar.ZLib,
         BatchingMaxPublishDelay: 100 * time.Second,
       }
   
   Enable debug logging and produce to a partitioned topic with a reasonable number of partitions (in my case, six), using `SendAsync()`. Note that the debug log will frequently stall after a `Received send request` message, and pause until a flush initiated by the max publish delay occurs.
   
   Increase `MaxPendingMessages` to 2000 and try again. The stalls now go away.
   
   For optimal throughput, it _seems_ that I need to set `MaxPendingMessages` to approx `BatchingMaxMessages` multiplied by the number of partitions in the target topic. 
   
   #### System configuration
   
   Pulsar version: 2.6.1
   Client version: 71cc54f (current master)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] bschofield edited a comment on issue #437: SendAsync() can stall with large BatchingMaxPublishDelay

Posted by GitBox <gi...@apache.org>.
bschofield edited a comment on issue #437:
URL: https://github.com/apache/pulsar-client-go/issues/437#issuecomment-756858296


   Another way to fix this seems to be to edit _batch_builder.go_, `batchContainer.Add()` so that it return `false` if the current batch container has too many messages instead of just too many bytes, i.e. to change
   
       } else if bc.hasSpace(payload) {
       	// The current batch is full. Producer has to call Flush() to
       	return false
       }
   
   to
   
       } else if bc.IsFull() || bc.hasSpace(payload) {
       	// The current batch is full. Producer has to call Flush() to
       	return false
       }
   
   (A little confusingly, the `hasSpace()` function returns `true` if there is _not_ enough space.)
   
   It could be the case that increasing `MaxPendingMessages` is just a band-aid which is causing the batch containers to hit the size limit, and the real bug is here.
   
   However, I'm not confident enough that this is the correct fix to send a PR for it. Would appreciate @merlimat or @wolfstudy taking a look if you have time.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] bschofield commented on issue #437: SendAsync() can stall with large BatchingMaxPublishDelay

Posted by GitBox <gi...@apache.org>.
bschofield commented on issue #437:
URL: https://github.com/apache/pulsar-client-go/issues/437#issuecomment-756824096


   Profiling the goroutines in my test program, at the time the send is stalled, results in this.
   
   ![Screenshot from 2021-01-08 15-33-43](https://user-images.githubusercontent.com/1534502/104033053-edcde400-51c6-11eb-90bf-b3ec76fa95d7.png)
   
   Note that `partitionProducer.internalSendAsync()` is blocked on semaphore acquisition. Since I don't have the  `DisableBlockIfQueueFull` option sent, I think this must be at line 586 of _producer_partition.go_:
   
       p.publishSemaphore.Acquire()
   
   In the creation of `partitionProducer` at line 108, `publishSemaphore` is initialized with a size from `maxPendingMessages`:
   
       publishSemaphore: internal.NewSemaphore(int32(maxPendingMessages)),
   
   I am unclear why this semaphore size is inadequate, but this seems like it might be the source of the issue.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] bschofield commented on issue #437: SendAsync() can stall with large BatchingMaxPublishDelay

Posted by GitBox <gi...@apache.org>.
bschofield commented on issue #437:
URL: https://github.com/apache/pulsar-client-go/issues/437#issuecomment-955191188


   I believe this is probably fixed by https://github.com/apache/pulsar-client-go/pull/528, and so I am closing.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] bschofield closed issue #437: SendAsync() can stall with large BatchingMaxPublishDelay

Posted by GitBox <gi...@apache.org>.
bschofield closed issue #437:
URL: https://github.com/apache/pulsar-client-go/issues/437


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] bschofield edited a comment on issue #437: SendAsync() can stall with large BatchingMaxPublishDelay

Posted by GitBox <gi...@apache.org>.
bschofield edited a comment on issue #437:
URL: https://github.com/apache/pulsar-client-go/issues/437#issuecomment-756824096






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] bschofield edited a comment on issue #437: SendAsync() can stall with large BatchingMaxPublishDelay

Posted by GitBox <gi...@apache.org>.
bschofield edited a comment on issue #437:
URL: https://github.com/apache/pulsar-client-go/issues/437#issuecomment-756824096


   Profiling the goroutines in my test program, at the time the send is stalled, results in this.
   
   ![Screenshot from 2021-01-08 15-33-43](https://user-images.githubusercontent.com/1534502/104033053-edcde400-51c6-11eb-90bf-b3ec76fa95d7.png)
   
   Note that `partitionProducer.internalSendAsync()` is blocked on semaphore acquisition. Since I don't have the  `DisableBlockIfQueueFull` option sent, I think this must be at line 586 of _producer_partition.go_:
   
       p.publishSemaphore.Acquire()
   
   In the creation of `partitionProducer` at line 108, `publishSemaphore` is initialized with a size from `maxPendingMessages`:
   
       publishSemaphore: internal.NewSemaphore(int32(maxPendingMessages)),
   
   I am unclear why this semaphore size is inadequate, but this seems like it might be the source of the issue.
   
   Is there perhaps a case where the semaphore can fill before the message-count-based batch flush actually occurs, so that the corresponding release in `ReceivedSendReceipt()` never actually fires?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] bschofield edited a comment on issue #437: SendAsync() can stall with large BatchingMaxPublishDelay

Posted by GitBox <gi...@apache.org>.
bschofield edited a comment on issue #437:
URL: https://github.com/apache/pulsar-client-go/issues/437#issuecomment-757918698


   I worked around this bug by setting `BatchingMaxMessages = math.MaxInt32`, so that batching is driven only by the space limit and the max delay (which I set to 1 second).
   
   This seems to give me throughput comparable to, or better than, the cgo client.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] bschofield edited a comment on issue #437: SendAsync() can stall with large BatchingMaxPublishDelay

Posted by GitBox <gi...@apache.org>.
bschofield edited a comment on issue #437:
URL: https://github.com/apache/pulsar-client-go/issues/437#issuecomment-756858296


   Another way to fix this seems to me to edit _batch_builder.go_, `batchContainer.Add()` so that it return `false` if the current batch container has too many messages instead of just too many bytes, i.e. to change
   
       } else if bc.hasSpace(payload) {
       	// The current batch is full. Producer has to call Flush() to
       	return false
       }
   
   to
   
       } else if bc.IsFull() || bc.hasSpace(payload) {
       	// The current batch is full. Producer has to call Flush() to
       	return false
       }
   
   (A little confusingly, the `hasSpace()` function returns `true` if there is _not_ enough space.)
   
   It could be the case that increasing `MaxPendingMessages` is just a band-aid which is causing the batch containers to hit the size limit, and the real bug is here.
   
   However, I'm not confident enough that this is the correct fix to send a PR for it. Would appreciate @merlimat or @wolfstudy taking a look if you have time.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] bschofield commented on issue #437: SendAsync() can stall with large BatchingMaxPublishDelay

Posted by GitBox <gi...@apache.org>.
bschofield commented on issue #437:
URL: https://github.com/apache/pulsar-client-go/issues/437#issuecomment-756824096






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] bschofield commented on issue #437: SendAsync() can stall with large BatchingMaxPublishDelay

Posted by GitBox <gi...@apache.org>.
bschofield commented on issue #437:
URL: https://github.com/apache/pulsar-client-go/issues/437#issuecomment-756858296


   Another way to fix this seems to me to edit _batch_builder.go_, `batchContainer.Add()` so that it return `false` if the current batch container has too many messages instead of just too many bytes, i.e. to change
   
       } else if bc.hasSpace(payload) {
       	// The current batch is full. Producer has to call Flush() to
       	return false
       }
   
   to
   
       } else if bc.IsFull() || bc.hasSpace(payload) {
       	// The current batch is full. Producer has to call Flush() to
       	return false
       }
   
   (A little confusingly, the `hasSpace()` function returns `true` if there is _not_ enough space.)
   
   I'm not confident enough that this is the correct fix to send a PR for it.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] bschofield edited a comment on issue #437: SendAsync() can stall with large BatchingMaxPublishDelay

Posted by GitBox <gi...@apache.org>.
bschofield edited a comment on issue #437:
URL: https://github.com/apache/pulsar-client-go/issues/437#issuecomment-756824096


   Profiling the goroutines in my test program, at the time the send is stalled, results in this.
   
   ![Screenshot from 2021-01-08 15-33-43](https://user-images.githubusercontent.com/1534502/104033053-edcde400-51c6-11eb-90bf-b3ec76fa95d7.png)
   
   Note that `partitionProducer.internalSendAsync()` is blocked on semaphore acquisition. Since I don't have the  `DisableBlockIfQueueFull` option sent, I think this must be at line 586 of _producer_partition.go_:
   
       p.publishSemaphore.Acquire()
   
   In the creation of `partitionProducer` at line 108, `publishSemaphore` is initialized with a size from `maxPendingMessages`:
   
       publishSemaphore: internal.NewSemaphore(int32(maxPendingMessages)),
   
   I am unclear why this semaphore size is inadequate, but this seems like it might be the source of the issue. Is there a case where the semaphore can fill before the batch flush actually occurs, so that the corresponding release in `ReceivedSendReceipt()` never actually fires?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] bschofield commented on issue #437: SendAsync() can stall with large BatchingMaxPublishDelay

Posted by GitBox <gi...@apache.org>.
bschofield commented on issue #437:
URL: https://github.com/apache/pulsar-client-go/issues/437#issuecomment-757918698


   I worked around this bug by setting `BatchingMaxMessages = math.MaxInt32`, so that batching is driven only by the space limit and the max delay (which I set to 1 second).
   
   This seem to give me throughput comparable to, or better than, the cgo client.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] bschofield edited a comment on issue #437: SendAsync() can stall with large BatchingMaxPublishDelay

Posted by GitBox <gi...@apache.org>.
bschofield edited a comment on issue #437:
URL: https://github.com/apache/pulsar-client-go/issues/437#issuecomment-756824096


   Profiling the goroutines in my test program, at the time the send is stalled, results in this.
   
   ![Screenshot from 2021-01-08 15-33-43](https://user-images.githubusercontent.com/1534502/104033053-edcde400-51c6-11eb-90bf-b3ec76fa95d7.png)
   
   Note that `partitionProducer.internalSendAsync()` is blocked on semaphore acquisition. Since I don't have the  `DisableBlockIfQueueFull` option sent, I think this must be at line 586 of _producer_partition.go_:
   
       p.publishSemaphore.Acquire()
   
   In the creation of `partitionProducer` at line 108, `publishSemaphore` is initialized with a size from `maxPendingMessages`:
   
       publishSemaphore: internal.NewSemaphore(int32(maxPendingMessages)),
   
   I am unclear why this semaphore size is inadequate, but this seems like it might be the source of the issue.
   
   Is there perhaps a case where the semaphore can fill before the batch flush actually occurs, so that the corresponding release in `ReceivedSendReceipt()` never actually fires?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] bschofield edited a comment on issue #437: SendAsync() can stall with large BatchingMaxPublishDelay

Posted by GitBox <gi...@apache.org>.
bschofield edited a comment on issue #437:
URL: https://github.com/apache/pulsar-client-go/issues/437#issuecomment-756858296


   Another way to fix this seems to me to edit _batch_builder.go_, `batchContainer.Add()` so that it return `false` if the current batch container has too many messages instead of just too many bytes, i.e. to change
   
       } else if bc.hasSpace(payload) {
       	// The current batch is full. Producer has to call Flush() to
       	return false
       }
   
   to
   
       } else if bc.IsFull() || bc.hasSpace(payload) {
       	// The current batch is full. Producer has to call Flush() to
       	return false
       }
   
   (A little confusingly, the `hasSpace()` function returns `true` if there is _not_ enough space.)
   
   It could be the case that increasing `MaxPendingMessages` is just a band-aid which is causing the batch containers to hit the size limit, and the real bug is here.
   
   However, I'm not confident enough that this is the correct fix to send a PR for it.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] bschofield edited a comment on issue #437: SendAsync() can stall with large BatchingMaxPublishDelay

Posted by GitBox <gi...@apache.org>.
bschofield edited a comment on issue #437:
URL: https://github.com/apache/pulsar-client-go/issues/437#issuecomment-756858296


   Currently, in _batch_builder.go_, `batchContainer.Add()` returns `false` if the current batch container has too many bytes, but _not_ if the batch container has too many messages.
   
   If we change that, i.e. edit
   
       } else if bc.hasSpace(payload) {
       	// The current batch is full. Producer has to call Flush() to
       	return false
       }
   
   to
   
       } else if bc.IsFull() || bc.hasSpace(payload) {
       	// The current batch is full. Producer has to call Flush() to
       	return false
       }
   
   ...then the problem also seems to go away, without changing `MaxPendingMessages`.
   
   It could be the case that increasing `MaxPendingMessages` is just a band-aid which is causing the batch containers to hit the size limit, and the real bug is here.
   
   However, I'm not confident enough that this is the correct fix to send a PR for it. Would appreciate @merlimat or @wolfstudy taking a look if you have time.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org