You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2022/08/11 17:03:20 UTC
[pulsar] branch branch-2.11 updated: [imp][java client] lastBatchSendNanoTime initialization (#17058)
This is an automated email from the ASF dual-hosted git repository.
mmarshall pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.11 by this push:
new f1371fac9a1 [imp][java client] lastBatchSendNanoTime initialization (#17058)
f1371fac9a1 is described below
commit f1371fac9a1ba6e5ecba3094f318bd3aa4171337
Author: Michael Marshall <mm...@apache.org>
AuthorDate: Thu Aug 11 11:58:37 2022 -0500
[imp][java client] lastBatchSendNanoTime initialization (#17058)
### Motivation
https://github.com/apache/pulsar/pull/14185 had a minor bug in that the first message sent to the producer would always deliver because `lastBatchSendNanoTime` was not initialized correctly. In https://github.com/apache/pulsar/pull/15406, we improved the initialization, but I am concerned that it didn't completely solve the problem because the counter is initialized before grabbing the connection, which will often take longer than the `batchingMaxPublishDelayMicros`, which defaults to [...]
If possible, I hope to get this merged and cherry-picked to branch-2.11 as soon as possible.
### Modifications
* Update the `batchFlushTask()` method to handle the initialization case for `lastBatchSendNanoTime`.
* Remove the initialization for `lastBatchSendNanoTime` in the constructor.
### Verifying this change
This test is fairly trivial to verify, but is hard to test for. I manually verified using debug logs to see that the time is correctly initialized on the first run and not afterwards.
### Documentation
- [x] `doc-not-needed`
(cherry picked from commit ce419cbd2f3fe0b66ede580e3f4e19693c1f0240)
---
.../main/java/org/apache/pulsar/client/impl/ProducerImpl.java | 9 ++++++---
1 file changed, 6 insertions(+), 3 deletions(-)
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 210fc14eb66..64ed61bd995 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -255,7 +255,6 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
}
this.batchMessageContainer = (BatchMessageContainerBase) containerBuilder.build();
this.batchMessageContainer.setProducer(this);
- this.lastBatchSendNanoTime = System.nanoTime();
} else {
this.batchMessageContainer = null;
}
@@ -2091,9 +2090,13 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
long microsSinceLastSend = TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - lastBatchSendNanoTime);
if (microsSinceLastSend < conf.getBatchingMaxPublishDelayMicros()) {
scheduleBatchFlushTask(conf.getBatchingMaxPublishDelayMicros() - microsSinceLastSend);
- return;
+ } else if (lastBatchSendNanoTime == 0) {
+ // The first time a producer sends a message, the lastBatchSendNanoTime is 0.
+ lastBatchSendNanoTime = System.nanoTime();
+ scheduleBatchFlushTask(conf.getBatchingMaxPublishDelayMicros());
+ } else {
+ batchMessageAndSend(true);
}
- batchMessageAndSend(true);
}
// must acquire semaphore before enqueuing