You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2024/01/22 15:01:56 UTC

(pulsar) branch branch-2.10 updated: [fix][test] Fix PerformanceProducer send count error (#21706)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new bbdb6c7a741 [fix][test] Fix PerformanceProducer send count error (#21706)
bbdb6c7a741 is described below

commit bbdb6c7a741bedaaa6778439d307cc00499ebd5d
Author: Jiwe Guo <te...@apache.org>
AuthorDate: Mon Jan 22 23:01:42 2024 +0800

    [fix][test] Fix PerformanceProducer send count error (#21706)
---
 .../java/org/apache/pulsar/testclient/PerformanceProducer.java | 10 +++++-----
 1 file changed, 5 insertions(+), 5 deletions(-)

diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
index 762ad61b837..788a6d4daf5 100644
--- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
+++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
@@ -681,7 +681,7 @@ public class PerformanceProducer {
                 }
             }
             // Send messages on all topics/producers
-            long totalSent = 0;
+            AtomicLong totalSent = new AtomicLong(0);
             AtomicLong numMessageSend = new AtomicLong(0);
             Semaphore numMsgPerTxnLimit = new Semaphore(arguments.numMessagesPerTransaction);
             while (true) {
@@ -701,7 +701,7 @@ public class PerformanceProducer {
                     }
 
                     if (numMessages > 0) {
-                        if (totalSent++ >= numMessages) {
+                        if (totalSent.get() >= numMessages) {
                             log.info("------------- DONE (reached the maximum number: {} of production) --------------"
                                     , numMessages);
                             doneLatch.countDown();
@@ -719,7 +719,7 @@ public class PerformanceProducer {
 
                     if (arguments.payloadFilename != null) {
                         if (messageFormatter != null) {
-                            payloadData = messageFormatter.formatMessage(arguments.producerName, totalSent,
+                            payloadData = messageFormatter.formatMessage(arguments.producerName, totalSent.get(),
                                     payloadByteList.get(random.nextInt(payloadByteList.size())));
                         } else {
                             payloadData = payloadByteList.get(random.nextInt(payloadByteList.size()));
@@ -749,13 +749,13 @@ public class PerformanceProducer {
                     if (msgKeyMode == MessageKeyGenerationMode.random) {
                         messageBuilder.key(String.valueOf(random.nextInt()));
                     } else if (msgKeyMode == MessageKeyGenerationMode.autoIncrement) {
-                        messageBuilder.key(String.valueOf(totalSent));
+                        messageBuilder.key(String.valueOf(totalSent.get()));
                     }
                     PulsarClient pulsarClient = client;
                     messageBuilder.sendAsync().thenRun(() -> {
                         bytesSent.add(payloadData.length);
                         messagesSent.increment();
-
+                        totalSent.incrementAndGet();
                         totalMessagesSent.increment();
                         totalBytesSent.add(payloadData.length);