You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2022/08/11 17:03:20 UTC

[pulsar] branch branch-2.11 updated: [imp][java client] lastBatchSendNanoTime initialization (#17058)

This is an automated email from the ASF dual-hosted git repository.

mmarshall pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.11 by this push:
     new f1371fac9a1 [imp][java client] lastBatchSendNanoTime initialization (#17058)
f1371fac9a1 is described below

commit f1371fac9a1ba6e5ecba3094f318bd3aa4171337
Author: Michael Marshall <mm...@apache.org>
AuthorDate: Thu Aug 11 11:58:37 2022 -0500

    [imp][java client] lastBatchSendNanoTime initialization (#17058)
    
    ### Motivation
    
    https://github.com/apache/pulsar/pull/14185 had a minor bug in that the first message sent to the producer would always deliver because `lastBatchSendNanoTime` was not initialized correctly. In https://github.com/apache/pulsar/pull/15406, we improved the initialization, but I am concerned that it didn't completely solve the problem because the counter is initialized before grabbing the connection, which will often take longer than the `batchingMaxPublishDelayMicros`, which defaults to [...]
    
    If possible, I hope to get this merged and cherry-picked to branch-2.11 as soon as possible.
    
    ### Modifications
    
    * Update the `batchFlushTask()` method to handle the initialization case for `lastBatchSendNanoTime`.
    * Remove the initialization for `lastBatchSendNanoTime` in the constructor.
    
    ### Verifying this change
    
    This test is fairly trivial to verify, but is hard to test for. I manually verified using debug logs to see that the time is correctly initialized on the first run and not afterwards.
    
    ### Documentation
    
    - [x] `doc-not-needed`
    
    (cherry picked from commit ce419cbd2f3fe0b66ede580e3f4e19693c1f0240)
---
 .../main/java/org/apache/pulsar/client/impl/ProducerImpl.java    | 9 ++++++---
 1 file changed, 6 insertions(+), 3 deletions(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 210fc14eb66..64ed61bd995 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -255,7 +255,6 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
             }
             this.batchMessageContainer = (BatchMessageContainerBase) containerBuilder.build();
             this.batchMessageContainer.setProducer(this);
-            this.lastBatchSendNanoTime = System.nanoTime();
         } else {
             this.batchMessageContainer = null;
         }
@@ -2091,9 +2090,13 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
         long microsSinceLastSend = TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - lastBatchSendNanoTime);
         if (microsSinceLastSend < conf.getBatchingMaxPublishDelayMicros()) {
             scheduleBatchFlushTask(conf.getBatchingMaxPublishDelayMicros() - microsSinceLastSend);
-            return;
+        } else if (lastBatchSendNanoTime == 0) {
+            // The first time a producer sends a message, the lastBatchSendNanoTime is 0.
+            lastBatchSendNanoTime = System.nanoTime();
+            scheduleBatchFlushTask(conf.getBatchingMaxPublishDelayMicros());
+        } else {
+            batchMessageAndSend(true);
         }
-        batchMessageAndSend(true);
     }
 
     // must acquire semaphore before enqueuing