You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2018/11/28 17:13:38 UTC

[kafka] branch trunk updated: MINOR: Support long maxMessages in Trogdor consume/produce bench workers (#5957)

This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ec66818  MINOR: Support long maxMessages in Trogdor consume/produce bench workers (#5957)
ec66818 is described below

commit ec668180d797c4d08ea899de61c344438621ced7
Author: Stanislav Kozlovski <st...@outlook.com>
AuthorDate: Wed Nov 28 17:13:21 2018 +0000

    MINOR: Support long maxMessages in Trogdor consume/produce bench workers (#5957)
    
    Reivewers: Colin McCabe <cm...@apache.org>
---
 .../java/org/apache/kafka/trogdor/workload/ConsumeBenchSpec.java    | 6 +++---
 .../java/org/apache/kafka/trogdor/workload/ConsumeBenchWorker.java  | 2 +-
 .../java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java    | 6 +++---
 .../java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java  | 2 +-
 4 files changed, 8 insertions(+), 8 deletions(-)

diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchSpec.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchSpec.java
index 0e239b0..b7e0172 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchSpec.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchSpec.java
@@ -91,7 +91,7 @@ public class ConsumeBenchSpec extends TaskSpec {
     private final String consumerNode;
     private final String bootstrapServers;
     private final int targetMessagesPerSec;
-    private final int maxMessages;
+    private final long maxMessages;
     private final Map<String, String> consumerConf;
     private final Map<String, String> adminClientConf;
     private final Map<String, String> commonClientConf;
@@ -105,7 +105,7 @@ public class ConsumeBenchSpec extends TaskSpec {
                             @JsonProperty("consumerNode") String consumerNode,
                             @JsonProperty("bootstrapServers") String bootstrapServers,
                             @JsonProperty("targetMessagesPerSec") int targetMessagesPerSec,
-                            @JsonProperty("maxMessages") int maxMessages,
+                            @JsonProperty("maxMessages") long maxMessages,
                             @JsonProperty("consumerGroup") String consumerGroup,
                             @JsonProperty("consumerConf") Map<String, String> consumerConf,
                             @JsonProperty("commonClientConf") Map<String, String> commonClientConf,
@@ -146,7 +146,7 @@ public class ConsumeBenchSpec extends TaskSpec {
     }
 
     @JsonProperty
-    public int maxMessages() {
+    public long maxMessages() {
         return maxMessages;
     }
 
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchWorker.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchWorker.java
index a44d521..1e80209 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchWorker.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchWorker.java
@@ -233,7 +233,7 @@ public class ConsumeBenchWorker implements TaskWorker {
             long bytesConsumed = 0;
             long startTimeMs = Time.SYSTEM.milliseconds();
             long startBatchMs = startTimeMs;
-            int maxMessages = spec.maxMessages();
+            long maxMessages = spec.maxMessages();
             try {
                 while (messagesConsumed < maxMessages) {
                     ConsumerRecords<byte[], byte[]> records = consumer.poll();
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java
index c0bbd7e..d15172f 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java
@@ -64,7 +64,7 @@ public class ProduceBenchSpec extends TaskSpec {
     private final String producerNode;
     private final String bootstrapServers;
     private final int targetMessagesPerSec;
-    private final int maxMessages;
+    private final long maxMessages;
     private final PayloadGenerator keyGenerator;
     private final PayloadGenerator valueGenerator;
     private final Optional<TransactionGenerator> transactionGenerator;
@@ -80,7 +80,7 @@ public class ProduceBenchSpec extends TaskSpec {
                          @JsonProperty("producerNode") String producerNode,
                          @JsonProperty("bootstrapServers") String bootstrapServers,
                          @JsonProperty("targetMessagesPerSec") int targetMessagesPerSec,
-                         @JsonProperty("maxMessages") int maxMessages,
+                         @JsonProperty("maxMessages") long maxMessages,
                          @JsonProperty("keyGenerator") PayloadGenerator keyGenerator,
                          @JsonProperty("valueGenerator") PayloadGenerator valueGenerator,
                          @JsonProperty("transactionGenerator") Optional<TransactionGenerator> txGenerator,
@@ -124,7 +124,7 @@ public class ProduceBenchSpec extends TaskSpec {
     }
 
     @JsonProperty
-    public int maxMessages() {
+    public long maxMessages() {
         return maxMessages;
     }
 
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java
index abf5976..84b94d5 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java
@@ -225,7 +225,7 @@ public class ProduceBenchWorker implements TaskWorker {
                     if (enableTransactions)
                         producer.initTransactions();
 
-                    int sentMessages = 0;
+                    long sentMessages = 0;
                     while (sentMessages < spec.maxMessages()) {
                         if (enableTransactions) {
                             boolean tookAction = takeTransactionAction();