You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2018/11/28 17:13:38 UTC
[kafka] branch trunk updated: MINOR: Support long maxMessages in
Trogdor consume/produce bench workers (#5957)
This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new ec66818 MINOR: Support long maxMessages in Trogdor consume/produce bench workers (#5957)
ec66818 is described below
commit ec668180d797c4d08ea899de61c344438621ced7
Author: Stanislav Kozlovski <st...@outlook.com>
AuthorDate: Wed Nov 28 17:13:21 2018 +0000
MINOR: Support long maxMessages in Trogdor consume/produce bench workers (#5957)
Reivewers: Colin McCabe <cm...@apache.org>
---
.../java/org/apache/kafka/trogdor/workload/ConsumeBenchSpec.java | 6 +++---
.../java/org/apache/kafka/trogdor/workload/ConsumeBenchWorker.java | 2 +-
.../java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java | 6 +++---
.../java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java | 2 +-
4 files changed, 8 insertions(+), 8 deletions(-)
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchSpec.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchSpec.java
index 0e239b0..b7e0172 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchSpec.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchSpec.java
@@ -91,7 +91,7 @@ public class ConsumeBenchSpec extends TaskSpec {
private final String consumerNode;
private final String bootstrapServers;
private final int targetMessagesPerSec;
- private final int maxMessages;
+ private final long maxMessages;
private final Map<String, String> consumerConf;
private final Map<String, String> adminClientConf;
private final Map<String, String> commonClientConf;
@@ -105,7 +105,7 @@ public class ConsumeBenchSpec extends TaskSpec {
@JsonProperty("consumerNode") String consumerNode,
@JsonProperty("bootstrapServers") String bootstrapServers,
@JsonProperty("targetMessagesPerSec") int targetMessagesPerSec,
- @JsonProperty("maxMessages") int maxMessages,
+ @JsonProperty("maxMessages") long maxMessages,
@JsonProperty("consumerGroup") String consumerGroup,
@JsonProperty("consumerConf") Map<String, String> consumerConf,
@JsonProperty("commonClientConf") Map<String, String> commonClientConf,
@@ -146,7 +146,7 @@ public class ConsumeBenchSpec extends TaskSpec {
}
@JsonProperty
- public int maxMessages() {
+ public long maxMessages() {
return maxMessages;
}
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchWorker.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchWorker.java
index a44d521..1e80209 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchWorker.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchWorker.java
@@ -233,7 +233,7 @@ public class ConsumeBenchWorker implements TaskWorker {
long bytesConsumed = 0;
long startTimeMs = Time.SYSTEM.milliseconds();
long startBatchMs = startTimeMs;
- int maxMessages = spec.maxMessages();
+ long maxMessages = spec.maxMessages();
try {
while (messagesConsumed < maxMessages) {
ConsumerRecords<byte[], byte[]> records = consumer.poll();
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java
index c0bbd7e..d15172f 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java
@@ -64,7 +64,7 @@ public class ProduceBenchSpec extends TaskSpec {
private final String producerNode;
private final String bootstrapServers;
private final int targetMessagesPerSec;
- private final int maxMessages;
+ private final long maxMessages;
private final PayloadGenerator keyGenerator;
private final PayloadGenerator valueGenerator;
private final Optional<TransactionGenerator> transactionGenerator;
@@ -80,7 +80,7 @@ public class ProduceBenchSpec extends TaskSpec {
@JsonProperty("producerNode") String producerNode,
@JsonProperty("bootstrapServers") String bootstrapServers,
@JsonProperty("targetMessagesPerSec") int targetMessagesPerSec,
- @JsonProperty("maxMessages") int maxMessages,
+ @JsonProperty("maxMessages") long maxMessages,
@JsonProperty("keyGenerator") PayloadGenerator keyGenerator,
@JsonProperty("valueGenerator") PayloadGenerator valueGenerator,
@JsonProperty("transactionGenerator") Optional<TransactionGenerator> txGenerator,
@@ -124,7 +124,7 @@ public class ProduceBenchSpec extends TaskSpec {
}
@JsonProperty
- public int maxMessages() {
+ public long maxMessages() {
return maxMessages;
}
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java
index abf5976..84b94d5 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java
@@ -225,7 +225,7 @@ public class ProduceBenchWorker implements TaskWorker {
if (enableTransactions)
producer.initTransactions();
- int sentMessages = 0;
+ long sentMessages = 0;
while (sentMessages < spec.maxMessages()) {
if (enableTransactions) {
boolean tookAction = takeTransactionAction();